Implement Batch Processing
- Given a list of numbers you need to square the numbers using multithreading
- List of Numbers are finite
- You need to limit number of threads (threadCount) that can be executed at a time to avoid over-resource consumption
- You can perform operations in batch per thread, hence at most fixed numbers (batch size) can be squared by a single thread
- Make sure the final List of numbers for squares are in the same order as the original one
- Batch Size strategy
- We can have a fixed batch size or we can just take
numbers.size() / threadCount
- This should work fine since numbers is list and anything less than that can be represented in another list without memory over flow
- For infinite numbers we will need to decide on batch size as fixed, since we don’t know the numbers.size() in advance
Method 1
- Using
Semaphore (to limit number of threads) and Thread.join() (to wait for threads to complete)
public class ConcurrentStreamProcessor {
static class Worker implements Runnable {
private final int index;
private final List<Integer> numbers;
private final Semaphore semaphore;
private final List<Integer> results;
public Worker(int index, List<Integer> numbers, Semaphore semaphore, List<Integer> results) {
this.index = index;
this.numbers = numbers;
this.semaphore = semaphore;
this.results = results;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Thread " + Thread.currentThread().getName() + " is started");
List<Integer> squaredNumbers = numbers.stream()
.map(number -> number * number)
.toList();
synchronized (results) { // Ensure thread-safe access to results list
for (int i = 0; i < squaredNumbers.size(); i++) {
results.set(index + i, squaredNumbers.get(i));
}
}
Thread.sleep(3000);
} catch (Exception ex) {
System.out.println("Some exception happened " + ex.getMessage());
} finally {
System.out.println("Thread " + Thread.currentThread().getName() + " is completed");
semaphore.release();
}
}
}
public static void main(String[] args) throws InterruptedException {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
int threadCount = 3;
List<Integer> results = processStream(numbers, threadCount);
System.out.println(results);
}
public static List<Integer> processStream(List<Integer> numbers, int threadCount) throws InterruptedException {
// numbers = 10, threadCount = 3, batchSize = 3
// batches = [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
int batchSize = numbers.size() / threadCount;
int i = 0;
Semaphore semaphore = new Semaphore(threadCount);
List<Integer> results = new ArrayList<>();
for (var number:numbers) {
results.add(0); // Initialize with 0 to avoid index issues
}
List<Thread> threads = new ArrayList<>();
while (i < numbers.size()) {
List<Integer> currentBatch = numbers.subList(i, Math.min(i + batchSize, numbers.size()));
Worker worker = new Worker(i, currentBatch, semaphore, results);
Thread thread = new Thread(worker);
threads.add(thread);
thread.start();
i = i + batchSize;
}
// wait for all the threads to complete
for (Thread thread:threads) {
thread.join();
}
return results;
}
}
Method 2
- Using Executor
-
-
- Futures encapsulating Callable
public class ConcurrentStreamProcessorWithExecutor {
static class Worker implements Callable<List<Integer>> {
private final List<Integer> numbers;
public Worker(List<Integer> numbers) {
this.numbers = numbers;
}
@Override
public List<Integer> call() throws InterruptedException {
System.out.println("Thread" + Thread.currentThread().getName() + "is started");
List<Integer> squaredNumbers = numbers.stream()
.map(number -> number * number)
.toList();
Thread.sleep(3000);
System.out.println("Thread" + Thread.currentThread().getName() + "is completed");
return squaredNumbers;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
int threadCount = 3;
List<Integer> results = processStream(numbers, threadCount);
System.out.println(results);
}
public static List<Integer> processStream(List<Integer> numbers, int threadCount) throws InterruptedException, ExecutionException {
// numbers = 10, threadCount = 3, batchSize = 3
// batches = [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
int batchSize = numbers.size() / threadCount;
int i = 0;
List<Integer> results = new ArrayList<>();
// use executor service + future
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<Future<List<Integer>>> futures = new ArrayList<>();
while (i < numbers.size()) {
List<Integer> currentBatch = numbers.subList(i, Math.min(i + batchSize, numbers.size()));
Worker worker = new Worker(currentBatch);
Future<List<Integer>> future = executorService.submit(worker);
futures.add(future);
i = i + batchSize;
}
// wait for all the threads to complete
for (var future:futures) {
var result = future.get();
results.addAll(result);
}
executorService.shutdown();
return results;
}
}