public class BlockingQueue<T> { private final Object lock = new Object(); private final Queue<T> queue; private final int capacity; // Constructor to initialize the queue with a specific capacity public BlockingQueue(int capacity) { this.queue = new LinkedList<>(); this.capacity = capacity; } // Method for producers to add an item to the queue public void put(T item) throws InterruptedException { synchronized (lock) { // Wait while the queue is full while (queue.size() == capacity) { lock.wait(); } // Add item to the queue queue.add(item); // for debugging purpose that we don't overshoot the capacity System.out.println(this); // Notify consumers that an item is available lock.notifyAll(); } } // Method for consumers to take an item from the queue public T take() throws InterruptedException { synchronized (lock) { // Wait while the queue is empty while (queue.isEmpty()) { lock.wait(); } // Remove item from the queue T item = queue.remove(); // Notify producers that space is available lock.notifyAll(); return item; } } // Method to get the current size of the queue public int size() { synchronized (lock) { return queue.size(); } } @Override public String toString() { return queue.toString(); }}
Driver
public class TestBlockingQueue { public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> bq = new BlockingQueue<>(2); // size 2 System.out.println("Producing: " + 1); bq.put(1); System.out.println("Producing: " + 2); bq.put(2); new Thread(() -> { try { System.out.println("Producing: " + 3); bq.put(3); } catch (Exception e) { Thread.currentThread().interrupt(); } }).start(); var result = bq.take(); System.out.println("Consumed: " + result); }}