Synchronizer

  • Any object that coordinates the control flow of threads based on its state
  • Examples:
    • Blocking queue
    • Semaphore
    • Barrier
    • Latch
  • Properties
    • Encapsulate state that determines whether threads arriving at the synchronizer should be allowed to pass or forced to wait
    • Provide methods to manipulate that state
    • Provide methods to wait efficiently for the synchronizer to enter the desired state

Latch

  • package: java.util.concurrent
  • class: CountDownLatch
  • A latch act as a gate, until the latch reaches a terminal state, the gate is closed
  • They are single use objects, once they reach terminal state, it cannot be reset
  • Constructor:
    • Initialize with the positive integer counter
  • Methods:
    • await() — until the counter reaches zero
    • countDown() — reduce the counter
  • Use cases:
    • start a group of related activities
    • waiting for a group of related activities to complete
    • Waiting for all the players in multi-player game are ready to proceed
    • ensuring all the services which it depends on has started, each service could have its own binary latch
class Task implements Runnable {
    private final CountDownLatch countDownLatch;
    Task(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
 
    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " is working hard");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " is done");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            countDownLatch.countDown();
        }
    }
}
 
public class TestConcurrency {
    public static void main(String[] args) throws InterruptedException {
        int count = 10;
        CountDownLatch countDownLatch = new CountDownLatch(count);
 
        // run all the tasks
        for (int i = 0; i < count; i++) {
            new Thread(new Task(countDownLatch), "my-thread-" + i).start();
        }
 
        // wait till all the threads are completed
        countDownLatch.await();
        System.out.println("All tasks are done");
    }
}

Semaphore

  • package: java.util.concurrent
  • class: Semaphore
  • https://www.youtube.com/watch?v=shH38znT_sQ
  • It is a permit machine and works on the concept of permits which is nothing but a counter
  • Imagine a permit as some ticket which any thread can acquire and give back when released
    • Since it is a counter internally any other thread can also release permit
  • Constructor:
    • Initialize with the integer (can be negative) number of permits
  • Methods:
    • acquire() — Acquire a permit
      • if permits > 0 internally decrease by 1
      • else wait for permits > 0
    • release() — Release a permit, internally increasing by 1
  • A binary Semaphore is the one with an initial permit count of 1
    • It can be used as mutex lock with non-reentrant semantics
  • Use cases:
    • Primary use case is to restrict/manage use of limited resources
    • useful in implementing resource pools such as database connection pool
    • Used in implementing BoundedBuffer class
class Task implements Runnable {
    private final Semaphore semaphore;
    Task(Semaphore semaphore) {
        this.semaphore = semaphore;
    }
 
    @Override
    public void run() {
        try {
            // acquire one permit (> 0) else wait for one!
            // if acquire successful, internally permits--
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " is working hard");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " is done, releasing permit...");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            // release the permit, internally permits++
            semaphore.release();
        }
    }
}
 
public class TestConcurrency {
    public static void main(String[] args) {
        // we have initial 3 permits
        Semaphore semaphore = new Semaphore(3);
 
        // each thread will perform semaphore.acquire() and
        // only the first 3 will get the permit and execute concurrently.
        // Until one of them releases the permit next cannot start.
        // So at any given time, we will only have 3 threads
        // running concurrently
        for (int i = 0; i < 20; i++) {
            new Thread(new Task(semaphore), "my-thread-" + i).start();
        }
    }
}

BlockingQueue

  • package: java.util.concurrent
  • class: BlockingQueue
  • provide blocking put() and take() methods
    • timed equivalents: offer() and poll()
    • these equivalents do not block indefinitely and return success/failure
  • If queue is full put() blocks until space becomes available
  • If queue is empty take() blocks until an element is available
  • Types of BlockingQueue
    • Unbounded
    • Bounded
  • Unbounded queues are never full, so put() never blocks
  • Design considerations:
    • Use Bounded BlockingQueue early because it is difficult to retrofit it later
    • If BoundedQueue does not fit design you can create other blocking data-structures using Semaphore
  • Implementations:
  • https://stackoverflow.com/questions/35967792/when-to-prefer-linkedblockingqueue-over-arrayblockingqueue

Barrier

  • package: java.util.concurrent
  • class: CyclicBarrier
  • Barriers are similar to Latches, they block a group of threads until some event has occurred
  • Diff: With a Barrier, all the threads must come together at a barrier point at the same time in order to proceed
  • In CyclicBarrier, threads call await when they reach barrier point, and await blocks until all the threads have reached the barrier point
  • If all threads reach barrier point then barrier is released and barrier is reset
  • Useful in algorithms/simulations where the work to calculate one step can be done in parallel but all the work associated with a given step must complete before advancing to the next step
  • Example:
    • Implement algorithm which divide problem into subproblems that can be calculated in parallel and combine results
    • n-body particle simulations, each step calculate update on each particle, before calculating next step we must update all the particle positions
    • Implement Conway’s Game of Life

AQS

  • Abstract Queue Synchronizer
  • Used internally to implement classes in java.util.concurrency like:
    • ReentrantLock
    • Semaphore
    • ReentrantReadWriteLock
    • CountDownLatch
    • SynchronousQueue
    • FutureTask
  • Operations similar to acquire (exclusive or non-exclusive) and release
  • It has state which is Integer
  • It also has internal queue (can be used to store waiting threads)