Implement Batch Processing

  • Given a list of numbers you need to square the numbers using multithreading
    • List of Numbers are finite
    • You need to limit number of threads (threadCount) that can be executed at a time to avoid over-resource consumption
    • You can perform operations in batch per thread, hence at most fixed numbers (batch size) can be squared by a single thread
    • Make sure the final List of numbers for squares are in the same order as the original one
  • Batch Size strategy
    • We can have a fixed batch size or we can just take numbers.size() / threadCount
    • This should work fine since numbers is list and anything less than that can be represented in another list without memory over flow
    • For infinite numbers we will need to decide on batch size as fixed, since we don’t know the numbers.size() in advance

Method 1

  • Using Semaphore (to limit number of threads) and Thread.join() (to wait for threads to complete)
public class ConcurrentStreamProcessor {
    static class Worker implements Runnable {
        private final int index;
        private final List<Integer> numbers;
        private final Semaphore semaphore;
        private final List<Integer> results;
 
        public Worker(int index, List<Integer> numbers, Semaphore semaphore, List<Integer> results) {
            this.index = index;
            this.numbers = numbers;
            this.semaphore = semaphore;
            this.results = results;
        }
 
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("Thread " + Thread.currentThread().getName() + " is started");
                List<Integer> squaredNumbers = numbers.stream()
                        .map(number -> number * number)
                        .toList();
                synchronized (results) {  // Ensure thread-safe access to results list
                    for (int i = 0; i < squaredNumbers.size(); i++) {
                        results.set(index + i, squaredNumbers.get(i));
                    }
                }
                Thread.sleep(3000);
            } catch (Exception ex) {
                System.out.println("Some exception happened " + ex.getMessage());
            } finally {
                System.out.println("Thread " + Thread.currentThread().getName() + " is completed");
                semaphore.release();
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
        int threadCount = 3;
        List<Integer> results = processStream(numbers, threadCount);
        System.out.println(results);
    }
 
    public static List<Integer> processStream(List<Integer> numbers, int threadCount) throws InterruptedException {
        // numbers = 10, threadCount = 3, batchSize = 3
        // batches = [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
        int batchSize = numbers.size() / threadCount;
        int i = 0;
        Semaphore semaphore = new Semaphore(threadCount);
        List<Integer> results = new ArrayList<>();
        for (var number:numbers) {
            results.add(0);  // Initialize with 0 to avoid index issues
        }
 
        List<Thread> threads = new ArrayList<>();
        while (i < numbers.size()) {
            List<Integer> currentBatch = numbers.subList(i, Math.min(i + batchSize, numbers.size()));
 
            Worker worker = new Worker(i, currentBatch, semaphore, results);
            Thread thread = new Thread(worker);
            threads.add(thread);
            thread.start();
 
            i = i + batchSize;
        }
 
        // wait for all the threads to complete
        for (Thread thread:threads) {
            thread.join();
        }
 
        return results;
    }
}

Method 2

  • Using Executor
    • FixedThreadPool
    • Futures encapsulating Callable
public class ConcurrentStreamProcessorWithExecutor {
 
    static class Worker implements Callable<List<Integer>> {
        private final List<Integer> numbers;
 
        public Worker(List<Integer> numbers) {
            this.numbers = numbers;
        }
 
        @Override
        public List<Integer> call() throws InterruptedException {
            System.out.println("Thread" + Thread.currentThread().getName() + "is started");
            List<Integer> squaredNumbers = numbers.stream()
                    .map(number -> number * number)
                    .toList();
 
            Thread.sleep(3000);
            System.out.println("Thread" + Thread.currentThread().getName() + "is completed");
 
            return squaredNumbers;
        }
    }
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
        int threadCount = 3;
        List<Integer> results = processStream(numbers, threadCount);
        System.out.println(results);
    }
 
    public static List<Integer> processStream(List<Integer> numbers, int threadCount) throws InterruptedException, ExecutionException {
        // numbers = 10, threadCount = 3, batchSize = 3
        // batches = [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
        int batchSize = numbers.size() / threadCount;
        int i = 0;
        List<Integer> results = new ArrayList<>();
 
        // use executor service + future
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        List<Future<List<Integer>>> futures = new ArrayList<>();
 
        while (i < numbers.size()) {
            List<Integer> currentBatch = numbers.subList(i, Math.min(i + batchSize, numbers.size()));
 
            Worker worker = new Worker(currentBatch);
 
            Future<List<Integer>> future = executorService.submit(worker);
            futures.add(future);
 
            i = i + batchSize;
        }
 
        // wait for all the threads to complete
        for (var future:futures) {
            var result = future.get();
            results.addAll(result);
        }
 
        executorService.shutdown();
 
        return results;
    }
}