How to implement an ExecutorService to execute batches of tasks
Solution 1
I think the ExecutorService itself will be able to perform your requirements.
Call invokeAll([...])
and iterate over all of your Tasks. All Tasks are finished, if you can iterate through all Futures.
Solution 2
As the other answers point out, there doesn't seem to be any part of your use case that requires a custom ExecutorService.
It seems to me that all you need to do is submit a batch, wait for them all to finish while ignoring interrupts on the main thread, then submit another batch perhaps based on the results of the first batch. I believe this is just a matter of:
ExecutorService service = ...;
Collection<Future> futures = new HashSet<Future>();
for (Callable callable : tasks) {
Future future = service.submit(callable);
futures.add(future);
}
for(Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
// Figure out if the interruption means we should stop.
}
}
// Use the results of futures to figure out a new batch of tasks.
// Repeat the process with the same ExecutorService.
Comments
-
Victor P. almost 2 years
I am looking for a way to execute batches of tasks in java. The idea is to have an
ExecutorService
based on a thread pool that will allow me to spread a set ofCallable
among different threads from amain
thread. This class should provide a waitForCompletion method that will put themain
thread to sleep until all tasks are executed. Then themain
thread should be awaken, and it will perform some operations and resubmit a set of tasks.This process will be repeated numerous times, so I would like to use
ExecutorService.shutdown
as this would require to create multiple instances ofExecutorService
.Currently I have implemented it in the following way using a
AtomicInteger
, and aLock
/Condition
:public class BatchThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicInteger mActiveCount; private final Lock mLock; private final Condition mCondition; public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){ ... for(C task : batch){ submit(task); mActiveCount.incrementAndGet(); } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); mLock.lock(); if (mActiveCount.decrementAndGet() == 0) { mCondition.signalAll(); } mLock.unlock(); } public void awaitBatchCompletion() throws InterruptedException { ... // Lock and wait until there is no active task mLock.lock(); while (mActiveCount.get() > 0) { try { mCondition.await(); } catch (InterruptedException e) { mLock.unlock(); throw e; } } mLock.unlock(); } }
Please not that I will not necessarily submit all the tasks from the batch at once, therefore
CountDownLatch
does not seem to be an option.Is this a valid way to do it? Is there a more efficient/elegant way to implement that?
Thanks
-
Victor P. about 12 yearsThis was my first implementation, the problem is that the
main
thread might be interupted while submitting its tasks (meaning that something willbreak
the execution of the loop), therefore I cannot rely oninvokeAll
. I could wait on theFuture.get
externally but I thought it was better in terms of design to have the executor responsible for that. I may be wrong though ;) -
Victor P. about 12 yearsCreating a new Executor each time would lead to approximately 240 new threads in 30s, I just feel bad about it :) Regarding the
executor.getActiveCount()
API says its only an approximate count, and theThread.sleep
is not a good option for me as I want to be as fast as possible: I am implementing a combinatorial optimization algorithm and the two performance metrics are solution quality and speed, each ms counts! -
Gray about 12 yearsDon't be. 240 new threads in 30 seconds is NOTHING. Try a for loop sometime that creates and destroys threads. See how many you can do in 30 seconds.
-
Gray about 12 yearsGiven your speed requirements, I'd drop the custom code and just create a new executor each time. You won't regret it.
-
Victor P. about 12 yearsOk I'll try your suggestion, I though it was bad practice to create threads and terminate them almost instantly. Thanks
-
artbristol about 12 yearsI think this solution is the cleanest.
main
thread might equally well be interrupted while sleeping - are you actually planning to interrupt it, or has this just arisen becauseInterruptedException
is checked? -
Victor P. about 12 yearsI'll follow your advice and fall back to what was my previous implementation. @ckuetbach you have the credit for this solution
-
Victor P. about 12 yearsI did not see that
invokeAll
internally waits for all tasks to complete, this may be the cleanest solution, I will refactor the code related to the interruption of the main thread (I am not talking about a call toThread.interrupt
, just a conditionalbeak
in the loop that is creating tasks)