Handling Exceptions for ThreadPoolExecutor

27,552

Solution 1

When you submit a task to the executor, it returns you a FutureTask instance.

FutureTask.get() will re-throw any exception thrown by the task as an ExecutorException.

So when you iterate through the List<Future> and call get on each, catch ExecutorException and invoke an orderly shutdown.

Solution 2

Since you are submitting tasks to ThreadPoolExecutor, the exceptions are getting swallowed by FutureTask.

Have a look at this code

**Inside FutureTask$Sync**

void innerRun() {
    if (!compareAndSetState(READY, RUNNING))
        return;

  runner = Thread.currentThread();
    if (getState() == RUNNING) { // recheck after setting thread
        V result;
       try {
            result = callable.call();
        } catch (Throwable ex) {
           setException(ex);
            return;
        }
       set(result);
    } else {
        releaseShared(0); // cancel
    }

}

protected void setException(Throwable t) {
   sync.innerSetException(t);
}

From above code, it is clear that setException method catching Throwable. Due to this reason, FutureTask is swallowing all exceptions if you use "submit()" method on ThreadPoolExecutor

As per java documentation, you can extend afterExecute() method in ThreadPoolExecutor

protected void afterExecute(Runnable r,
                            Throwable t) 

Sample code as per documentation:

class ExtendedExecutor extends ThreadPoolExecutor {
   // ...
   protected void afterExecute(Runnable r, Throwable t) {
     super.afterExecute(r, t);
     if (t == null && r instanceof Future<?>) {
       try {
         Object result = ((Future<?>) r).get();
       } catch (CancellationException ce) {
           t = ce;
       } catch (ExecutionException ee) {
           t = ee.getCause();
       } catch (InterruptedException ie) {
           Thread.currentThread().interrupt(); // ignore/reset
       }
     }
     if (t != null)
       System.out.println(t);
   }
 }

You can catch Exceptions in three ways

  1. Future.get() as suggested in accepted answer
  2. wrap entire run() or call() method in try{}catch{}Exceptoion{} blocks
  3. override afterExecute of ThreadPoolExecutor method as shown above

To gracefully interrupt other Threads, have a look at below SE question:

How to stop next thread from running in a ScheduledThreadPoolExecutor

How to forcefully shutdown java ExecutorService

Solution 3

Subclass ThreadPoolExecutor and override its protected afterExecute (Runnable r, Throwable t) method.

If you're creating a thread pool via the java.util.concurrent.Executors convenience class (which you're not), take at look at its source to see how it's invoking ThreadPoolExecutor.

Share:
27,552

Related videos on Youtube

jagamot
Author by

jagamot

Software Professional

Updated on July 09, 2022

Comments

  • jagamot
    jagamot almost 2 years

    I have the following code snippet that basically scans through the list of task that needs to be executed and each task is then given to the executor for execution.

    The JobExecutor in turn creates another executor (for doing db stuff...reading and writing data to queue) and completes the task.

    JobExecutor returns a Future<Boolean> for the tasks submitted. When one of the task fails, I want to gracefully interrupt all the threads and shutdown the executor by catching all the exceptions. What changes do I need to do?

    public class DataMovingClass {
        private static final AtomicInteger uniqueId = new AtomicInteger(0);
    
      private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();   
    
      ThreadPoolExecutor threadPoolExecutor  = null ;
    
       private List<Source> sources = new ArrayList<Source>();
    
        private static class IDGenerator extends ThreadLocal<Integer> {
            @Override
            public Integer get() {
                return uniqueId.incrementAndGet();
            }
      }
    
      public void init(){
    
        // load sources list
    
      }
    
      public boolean execute() {
    
        boolean succcess = true ; 
        threadPoolExecutor = new ThreadPoolExecutor(10,10,
                    10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("DataMigration-" + uniqueNumber.get());
                            return t;
                        }// End method
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
    
         List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();
    
         for (Source source : sources) {
                        result.add(threadPoolExecutor.submit(new JobExecutor(source)));
         }
    
         for (Future<Boolean> jobDone : result) {
                    try {
                        if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
                            // in case of successful DbWriterClass, we don't need to change
                            // it.
                            success = false;
                        }
                    } catch (Exception ex) {
                        // handle exceptions
                    }
                }
    
      }
    
      public class JobExecutor implements Callable<Boolean>  {
    
            private ThreadPoolExecutor threadPoolExecutor ;
            Source jobSource ;
            public SourceJobExecutor(Source source) {
                this.jobSource = source;
                threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                        new ThreadFactory() {
                            public Thread newThread(Runnable r) {
                                Thread t = new Thread(r);
                                t.setName("Job Executor-" + uniqueNumber.get());
                                return t;
                            }// End method
                        }, new ThreadPoolExecutor.CallerRunsPolicy());
            }
    
            public Boolean call() throws Exception {
                boolean status = true ; 
                System.out.println("Starting Job = " + jobSource.getName());
                try {
    
                            // do the specified task ; 
    
    
                }catch (InterruptedException intrEx) {
                    logger.warn("InterruptedException", intrEx);
                    status = false ;
                } catch(Exception e) {
                    logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
                    status = false ;
                }
               System.out.println("Ending Job = " + jobSource.getName());
                return status ;
            }
        }
    }   
    
  • jagamot
    jagamot about 14 years
    ok..do you see any other flaws or places where I need to handle exceptions?
  • VinceStyling
    VinceStyling almost 6 years
    the 3rd approach afterExecute is fit for me, thanks
  • RS1
    RS1 about 4 years
    Does this apply to only ThreadPoolExecutor and not to any other implementations of ExecutorService?
  • Ravindra babu
    Ravindra babu about 4 years
    It applies to docs.oracle.com/javase/8/docs/api/java/util/concurrent/… and its derived classes
  • Pragalathan  M
    Pragalathan M almost 4 years
    If you are applying this logic to ScheduledThreadPoolExecutor and calling scheduleAtFixedRate or scheduleWithFixedDelay then you are in trouble as the ((Future<?>) r).get(); will never return leading to exhaustion of the threads in the pool. Simple fix could be to change the condition to, if (t == null && r instanceof Future<?> && !r.getClass().getSimpleName().equals("ScheduledFutureTask")) { . As ScheduledFutureTask is a private class, you cant check with instanceof.