Producer Consumer Pattern

  • https://java-design-patterns.com/patterns/producer-consumer/
  • aka “Bounded Buffer” or “Consumer-Producer”
  • Producer produce data as it becomes available
  • Consumer retrieve data from the queue when they are ready
  • Producers don’t need to know anything about the identity or number of consumers
  • Consumers don’t need to know who producers are and where the work came from
  • This pattern enables:
    • code to be decoupled from each other
    • behavior to be coupled indirectly through the work queue

Performance

  • If the producers don’t generate work fast enough to keep the consumers busy, the consumers just wait until more work is available
    • Good in Server application, waiting for client requests
    • Bad in application where effectively infinite work needs to be done, like web crawler, Producer/Consumer ratio of threads need to be adjusted
  • If producers consistently generate work faster than the consumers can process it, eventually the application will run out of memory because of queue size
    • We should use BoundedQueue

Implementation

  • Can be done using BlockingQueue
  • Thread Pool coupled with a work queue is also a producer-consumer pattern

Bounded Buffer problem

  • aka Producer Consumer Problem
  • Entities:
    • We have two classes of Threads: Producers and Consumers
    • We have bounded buffer
  • Constraints
    • Producer must not insert data when the buffer is full
    • Consumers must not remove data when the buffer is empty
    • Producer and Consumer should not insert and remove data simultaneously
  • Issues that can happen:
    • The producers doesn’t block when the buffer is full
    • Two or more producers writes into the same slot
    • The consumers doesn’t block when the buffer is empty
    • A Consumer consumes an empty slot in the buffer
    • A consumer attempts to consume a slot that is only half-filled by a producer
    • Two or more consumers reads the same slot
    • And possibly more …
  • Solution uses 3 semaphores
    • mutex (binary semaphore)
    • empty semaphore (count empty slots)
    • full semaphore (count full slots)

Implementation using BlockingQueue

public class ProducerConsumer {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue = new BlockingQueue<>(5);
 
        // Producer Thread
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    System.out.println("Producing: " + i);
                    blockingQueue.put(i);
                    Thread.sleep(100); // Simulate some delay
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
 
        // Consumer Thread
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    int item = blockingQueue.take();
                    System.out.println("Consuming: " + item);
                    Thread.sleep(150); // Simulate some delay
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
 
        // Start both threads
        producer.start();
        consumer.start();
 
        // wait for them to finish
        producer.join();
        consumer.join();
 
        System.out.println("Execution successful");
    }
}