How to implement an ExecutorService to execute batches of tasks

10,799

Solution 1

I think the ExecutorService itself will be able to perform your requirements.

Call invokeAll([...]) and iterate over all of your Tasks. All Tasks are finished, if you can iterate through all Futures.

Solution 2

As the other answers point out, there doesn't seem to be any part of your use case that requires a custom ExecutorService.

It seems to me that all you need to do is submit a batch, wait for them all to finish while ignoring interrupts on the main thread, then submit another batch perhaps based on the results of the first batch. I believe this is just a matter of:

    ExecutorService service = ...;

    Collection<Future> futures = new HashSet<Future>();
    for (Callable callable : tasks) {
        Future future = service.submit(callable);
        futures.add(future);
    }

    for(Future future : futures) {
        try {
            future.get();
        } catch (InterruptedException e) {
            // Figure out if the interruption means we should stop.
        }
    }

    // Use the results of futures to figure out a new batch of tasks.
    // Repeat the process with the same ExecutorService.
Share:
10,799
Victor P.
Author by

Victor P.

PhD student in operations research

Updated on June 04, 2022

Comments

  • Victor P.
    Victor P. almost 2 years

    I am looking for a way to execute batches of tasks in java. The idea is to have an ExecutorService based on a thread pool that will allow me to spread a set of Callable among different threads from a main thread. This class should provide a waitForCompletion method that will put the main thread to sleep until all tasks are executed. Then the main thread should be awaken, and it will perform some operations and resubmit a set of tasks.

    This process will be repeated numerous times, so I would like to use ExecutorService.shutdown as this would require to create multiple instances of ExecutorService.

    Currently I have implemented it in the following way using a AtomicInteger, and a Lock/Condition:

    public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
      private final AtomicInteger mActiveCount;
      private final Lock          mLock;
      private final Condition     mCondition;
    
      public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
        ...
        for(C task : batch){
          submit(task);
          mActiveCount.incrementAndGet();
        }
      }
    
      @Override
      protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        mLock.lock();
        if (mActiveCount.decrementAndGet() == 0) {
          mCondition.signalAll();
        }
        mLock.unlock();
      }
    
      public void awaitBatchCompletion() throws InterruptedException {
        ...
        // Lock and wait until there is no active task
        mLock.lock();
        while (mActiveCount.get() > 0) {
          try {
            mCondition.await();
          } catch (InterruptedException e) {
            mLock.unlock();
            throw e;
          }
        }
        mLock.unlock();
      } 
    }
    

    Please not that I will not necessarily submit all the tasks from the batch at once, therefore CountDownLatch does not seem to be an option.

    Is this a valid way to do it? Is there a more efficient/elegant way to implement that?

    Thanks

  • Victor P.
    Victor P. about 12 years
    This was my first implementation, the problem is that the main thread might be interupted while submitting its tasks (meaning that something will break the execution of the loop), therefore I cannot rely on invokeAll. I could wait on the Future.get externally but I thought it was better in terms of design to have the executor responsible for that. I may be wrong though ;)
  • Victor P.
    Victor P. about 12 years
    Creating a new Executor each time would lead to approximately 240 new threads in 30s, I just feel bad about it :) Regarding the executor.getActiveCount() API says its only an approximate count, and the Thread.sleep is not a good option for me as I want to be as fast as possible: I am implementing a combinatorial optimization algorithm and the two performance metrics are solution quality and speed, each ms counts!
  • Gray
    Gray about 12 years
    Don't be. 240 new threads in 30 seconds is NOTHING. Try a for loop sometime that creates and destroys threads. See how many you can do in 30 seconds.
  • Gray
    Gray about 12 years
    Given your speed requirements, I'd drop the custom code and just create a new executor each time. You won't regret it.
  • Victor P.
    Victor P. about 12 years
    Ok I'll try your suggestion, I though it was bad practice to create threads and terminate them almost instantly. Thanks
  • artbristol
    artbristol about 12 years
    I think this solution is the cleanest. main thread might equally well be interrupted while sleeping - are you actually planning to interrupt it, or has this just arisen because InterruptedException is checked?
  • Victor P.
    Victor P. about 12 years
    I'll follow your advice and fall back to what was my previous implementation. @ckuetbach you have the credit for this solution
  • Victor P.
    Victor P. about 12 years
    I did not see that invokeAll internally waits for all tasks to complete, this may be the cleanest solution, I will refactor the code related to the interruption of the main thread (I am not talking about a call to Thread.interrupt, just a conditional beak in the loop that is creating tasks)