Any object that coordinates the control flow of threads based on its state
Examples:
Blocking queue
Semaphore
Barrier
Latch
Properties
Encapsulate state that determines whether threads arriving at the synchronizer should be allowed to pass or forced to wait
Provide methods to manipulate that state
Provide methods to wait efficiently for the synchronizer to enter the desired state
Latch
package: java.util.concurrent
class: CountDownLatch
A latch act as a gate, until the latch reaches a terminal state, the gate is closed
They are single use objects, once they reach terminal state, it cannot be reset
Constructor:
Initialize with the positive integer counter
Methods:
await() — until the counter reaches zero
countDown() — reduce the counter
Use cases:
start a group of related activities
waiting for a group of related activities to complete
Waiting for all the players in multi-player game are ready to proceed
ensuring all the services which it depends on has started, each service could have its own binary latch
class Task implements Runnable { private final CountDownLatch countDownLatch; Task(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " is working hard"); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " is done"); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { countDownLatch.countDown(); } }}public class TestConcurrency { public static void main(String[] args) throws InterruptedException { int count = 10; CountDownLatch countDownLatch = new CountDownLatch(count); // run all the tasks for (int i = 0; i < count; i++) { new Thread(new Task(countDownLatch), "my-thread-" + i).start(); } // wait till all the threads are completed countDownLatch.await(); System.out.println("All tasks are done"); }}
It is a permit machine and works on the concept of permits which is nothing but a counter
Imagine a permit as some ticket which any thread can acquire and give back when released
Since it is a counter internally any other thread can also release permit
Constructor:
Initialize with the integer (can be negative) number of permits
Methods:
acquire() — Acquire a permit
if permits > 0 internally decrease by 1
else wait for permits > 0
release() — Release a permit, internally increasing by 1
A binary Semaphore is the one with an initial permit count of 1
It can be used as mutex lock with non-reentrant semantics
Use cases:
Primary use case is to restrict/manage use of limited resources
useful in implementing resource pools such as database connection pool
Used in implementing BoundedBuffer class
class Task implements Runnable { private final Semaphore semaphore; Task(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { try { // acquire one permit (> 0) else wait for one! // if acquire successful, internally permits-- semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " is working hard"); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " is done, releasing permit..."); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { // release the permit, internally permits++ semaphore.release(); } }}public class TestConcurrency { public static void main(String[] args) { // we have initial 3 permits Semaphore semaphore = new Semaphore(3); // each thread will perform semaphore.acquire() and // only the first 3 will get the permit and execute concurrently. // Until one of them releases the permit next cannot start. // So at any given time, we will only have 3 threads // running concurrently for (int i = 0; i < 20; i++) { new Thread(new Task(semaphore), "my-thread-" + i).start(); } }}
BlockingQueue
package: java.util.concurrent
class: BlockingQueue
provide blocking put() and take() methods
timed equivalents: offer() and poll()
these equivalents do not block indefinitely and return success/failure
If queue is full put() blocks until space becomes available
If queue is empty take() blocks until an element is available
Types of BlockingQueue
Unbounded
Bounded
Unbounded queues are never full, so put() never blocks
Design considerations:
Use Bounded BlockingQueue early because it is difficult to retrofit it later
If BoundedQueue does not fit design you can create other blocking data-structures using Semaphore
Implementations:
LinkedBlockingQueue — can be unbounded + bounded
ArrayBlockingQueue — always bounded
PriorityBlockingQueue
SynchronousQueue — maintains no storage space for queued elements
Barriers are similar to Latches, they block a group of threads until some event has occurred
Diff: With a Barrier, all the threads must come together at a barrier point at the same time in order to proceed
In CyclicBarrier, threads call await when they reach barrier point, and await blocks until all the threads have reached the barrier point
If all threads reach barrier point then barrier is released and barrier is reset
Useful in algorithms/simulations where the work to calculate one step can be done in parallel but all the work associated with a given step must complete before advancing to the next step
Example:
Implement algorithm which divide problem into subproblems that can be calculated in parallel and combine results
n-body particle simulations, each step calculate update on each particle, before calculating next step we must update all the particle positions
Implement Conway’s Game of Life
AQS
Abstract Queue Synchronizer
Used internally to implement classes in java.util.concurrency like:
ReentrantLock
Semaphore
ReentrantReadWriteLock
CountDownLatch
SynchronousQueue
FutureTask
Operations similar to acquire (exclusive or non-exclusive) and release
It has state which is Integer
It also has internal queue (can be used to store waiting threads)