Java ThreadPoolExecutor getting stuck while using ArrayBlockingQueue

12,957

In your main method, you never call mtpe.shutdown(). A ThreadPoolExecutor will attempt to keep its corePoolSize alive indefinitely. Sometimes, you get lucky and you have more than corePoolSize threads alive, so every worker thread will go into a conditional logic branch that allows it to terminate after your specified timeout period of 10 seconds. However, as you have noticed, sometimes this is not the case so every thread in the executor will block on ArrayBlockingQueue.take() and wait for a new task.

Also, please note, there is a significant difference between ExecutorService.shutdown() and ExecutorService.shutdownNow(). If you call ExecutorService.shutdownNow() as your wrapper implementation indicates, you will on occasion drop some tasks which have not been assigned for execution.

Update: Since my original answer, the ThreadPoolExecutor implementation has changed such that the program in the original post should never exit.

Share:
12,957
Ravi Rao
Author by

Ravi Rao

Updated on July 20, 2022

Comments

  • Ravi Rao
    Ravi Rao almost 2 years

    I'm working on some application and using ThreadPoolExecutor for handling various tasks. ThreadPoolExecutor is getting stuck after some duration. To simulate this in a simpler environment, I've written a simple code where I'm able to simulate the issue.

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class MyThreadPoolExecutor {
        private int poolSize = 10;
        private int maxPoolSize = 50;
        private long keepAliveTime = 10;
        private ThreadPoolExecutor threadPool = null;
        private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
                100000);
    
        public MyThreadPoolExecutor() {
            threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                    keepAliveTime, TimeUnit.SECONDS, queue);
            threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    
                @Override
                public void rejectedExecution(Runnable runnable,
                        ThreadPoolExecutor threadPoolExecutor) {
                    System.out
                            .println("Execution rejected. Please try restarting the application.");
                }
    
            });
        }
    
        public void runTask(Runnable task) {
            threadPool.execute(task);
        }
    
        public void shutDown() {
            threadPool.shutdownNow();
        }
        public ThreadPoolExecutor getThreadPool() {
            return threadPool;
        }
    
        public void setThreadPool(ThreadPoolExecutor threadPool) {
            this.threadPool = threadPool;
        }
    
        public static void main(String[] args) {
            MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
            for (int i = 0; i < 1000; i++) {
                final int j = i;
                mtpe.runTask(new Runnable() {
    
                    @Override
                    public void run() {
                        System.out.println(j);
                    }
    
                });
            }
        }
    }
    

    Try executing this code a few times. It normally print outs the number on console and when all threads end, it exists. But at times, it finished all task and then is not getting terminated. The thread dump is as follows:

    MyThreadPoolExecutor [Java Application] 
        MyThreadPoolExecutor at localhost:2619 (Suspended)  
            Daemon System Thread [Attach Listener] (Suspended)  
            Daemon System Thread [Signal Dispatcher] (Suspended)    
            Daemon System Thread [Finalizer] (Suspended)    
                Object.wait(long) line: not available [native method]   
                ReferenceQueue<T>.remove(long) line: not available    
                ReferenceQueue<T>.remove() line: not available    
                Finalizer$FinalizerThread.run() line: not available 
            Daemon System Thread [Reference Handler] (Suspended)    
                Object.wait(long) line: not available [native method]   
                Reference$Lock(Object).wait() line: 485 
                Reference$ReferenceHandler.run() line: not available    
            Thread [pool-1-thread-1] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-2] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-3] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-4] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-6] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-8] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-5] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-10] (Suspended)   
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-9] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [pool-1-thread-7] (Suspended)    
                Unsafe.park(boolean, long) line: not available [native method]  
                LockSupport.park(Object) line: not available    
                AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
                ArrayBlockingQueue<E>.take() line: not available  
                ThreadPoolExecutor.getTask() line: not available    
                ThreadPoolExecutor$Worker.run() line: not available 
                Thread.run() line: not available    
            Thread [DestroyJavaVM] (Suspended)  
        C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)  
    

    In my actual application,ThreadPoolExecutor threads go in this state and then it stops responding.

    Regards, Ravi Rao