ThreadPoolExecutor fixed thread pool with custom behaviour

28,731

Solution 1

If you use Executors.newFixedThreadPool(10); it queues the tasks and they wait until a thread is ready.

This method is

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

As you can see, the queue used is unbounded (which can be a problem in itself) but it means the queue will never fill and you will never get a rejection.

BTW: If you have CPU bound tasks, an optimal number of threads can be

int processors = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(processors);

A test class which might illustrate the situation

public static void main(String... args) {
    ExecutorService es = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 1000 * 1000; i++)
        es.submit(new SleepOneSecond());

    System.out.println("Queue length " + ((ThreadPoolExecutor) es).getQueue().size());
    es.shutdown();
    System.out.println("After shutdown");
    try {
        es.submit(new SleepOneSecond());
    } catch (Exception e) {
        e.printStackTrace(System.out);
    }
}

static class SleepOneSecond implements Callable<Void> {
    @Override
    public Void call() throws Exception {
        Thread.sleep(1000);
        return null;
    }
}

prints

Queue length 999998
After shutdown
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@e026161 rejected from java.util.concurrent.ThreadPoolExecutor@3e472e76[Shutting down, pool size = 2, active threads = 2, queued tasks = 999998, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2013)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1337)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
    at Main.main(Main.java:17)

Solution 2

It is very possible that a thread calls exit, which sets mStopped to false and shutdowns the executor, but:

  • your running thread might be in the middle of the while (!mStopped) loop and tries to submit a task to the executor which has been shutdown by exit
  • the condition in the while returns true because the change made to mStopped is not visible (you don't use any form of synchronization around that flag).

I would suggest:

  • make mStopped volatile
  • handle the case where the executor is shutdown while you are in the middle of the loop (for example by catching RejectedExecutionException, or probably better: shutdown your executor after your while loop instead of shutting it down in your exit method).

Solution 3

Building on earlier suggestions, you can use a blocking queue to construct a fixed size ThreadPoolExecutor. If you then supply your own RejectedExecutionHandler which adds tasks to the blocking queue, it will behave as described.

Here's an example of how you could construct such an executor:

int corePoolSize    = 10;
int maximumPoolSize = 10;
int keepAliveTime   =  0;
int maxWaitingTasks = 10;

ThreadPoolExecutor blockingThreadPoolExecutor = new ThreadPoolExecutor(
        corePoolSize, maximumPoolSize,
        keepAliveTime, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(maxWaitingTasks),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted while submitting task", e);
                }
            }
        });
Share:
28,731
Simone Margaritelli
Author by

Simone Margaritelli

Security researcher, hardcore C/C++ developer , wannabe reverser and coffee addicted.

Updated on October 04, 2020

Comments

  • Simone Margaritelli
    Simone Margaritelli over 3 years

    i'm new to this topic ... i'm using a ThreadPoolExecutor created with Executors.newFixedThreadPool( 10 ) and after the pool is full i'm starting to get a RejectedExecutionException . Is there a way to "force" the executor to put the new task in a "wait" status instead of rejecting it and starting it when the pool is freed ?

    Thanks

    Issue regarding this https://github.com/evilsocket/dsploit/issues/159

    Line of code involved https://github.com/evilsocket/dsploit/blob/master/src/it/evilsocket/dsploit/net/NetworkDiscovery.java#L150

  • Simone Margaritelli
    Simone Margaritelli over 11 years
    still, i'm getting the RejectedExecutionException as you can see here github.com/evilsocket/dsploit/issues/159
  • Simone Margaritelli
    Simone Margaritelli over 11 years
  • Ian Roberts
    Ian Roberts over 11 years
    @SimoneMargaritelli the only reason you should get a RejectedExecutionException when using a newFixedThreadPool would be if you are trying to submit tasks to the executor after its shutdown method has been called.
  • Vishy
    Vishy over 11 years
    Given the pool has only ever run 31 tasks, I suspect its not full. Also the corePoolSize is 0 suggesting it has been shutdown which will trigger a rejection.
  • Vishy
    Vishy over 11 years
    @SimoneMargaritelli BTW You don't need to cast to ThreadPoolExecutor, I would leave it as the interface ExecutorService.