ThreadPoolExecutor Block When Queue Is Full?

60,975

Solution 1

In some very narrow circumstances, you can implement a java.util.concurrent.RejectedExecutionHandler that does what you need.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Now. This is a very bad idea for the following reasons

  • It's prone to deadlock because all the threads in the pool may die before the thing you put in the queue is visible. Mitigate this by setting a reasonable keep alive time.
  • The task is not wrapped the way your Executor may expect. Lots of executor implementations wrap their tasks in some sort of tracking object before execution. Look at the source of yours.
  • Adding via getQueue() is strongly discouraged by the API, and may be prohibited at some point.

A almost-always-better strategy is to install ThreadPoolExecutor.CallerRunsPolicy which will throttle your app by running the task on the thread which is calling execute().

However, sometimes a blocking strategy, with all its inherent risks, is really what you want. I'd say under these conditions

  • You only have one thread calling execute()
  • You have to (or want to) have a very small queue length
  • You absolutely need to limit the number of threads running this work (usually for external reasons), and a caller-runs strategy would break that.
  • Your tasks are of unpredictable size, so caller-runs could introduce starvation if the pool was momentarily busy with 4 short tasks and your one thread calling execute got stuck with a big one.

So, as I say. It's rarely needed and can be dangerous, but there you go.

Good Luck.

Solution 2

What you need to do is to wrap your ThreadPoolExecutor into Executor which explicitly limits amount of concurrently executed operations inside it:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

You can adjust concurrentTasksLimit to the threadPoolSize + queueSize of your delegate executor and it will pretty much solve your problem

Solution 3

You could use a semaphore to block threads from going into the pool.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Some gotchas:

  • Only use this pattern with a fixed thread pool. The queue is unlikely to be full often, thus new threads won't be created. Check out the java docs on ThreadPoolExecutor for more details: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html There is a way around this, but it is out of scope of this answer.
  • Queue size should be higher than the number of core threads. If we were to make the queue size 3, what would end up happening is:

    • T0: all three threads are doing work, the queue is empty, no permits are available.
    • T1: Thread 1 finishes, releases a permit.
    • T2: Thread 1 polls the queue for new work, finds none, and waits.
    • T3: Main thread submits work into the pool, thread 1 starts work.

    The example above translates to thread the main thread blocking thread 1. It may seem like a small period, but now multiply the frequency by days and months. All of a sudden, short periods of time add up to a large amount of time wasted.

Solution 4

This is what I ended up doing:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}

Solution 5

A fairly straightforward option is to wrap your BlockingQueue with an implementation that calls put(..) when offer(..) is being invoked:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

This works because by default put(..) waits until there is capacity in the queue when it is full, see:

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

No catching of RejectedExecutionException or complicated locking necessary.

Share:
60,975
ghempton
Author by

ghempton

Hacker and self-proclaimed designer. Founder of GroupTalent.

Updated on July 05, 2022

Comments

  • ghempton
    ghempton almost 2 years

    I am trying to execute lots of tasks using a ThreadPoolExecutor. Below is a hypothetical example:

    def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
    def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
    for(int i = 0; i < 100000; i++)
        threadPoolExecutor.execute(runnable)
    

    The problem is that I quickly get a java.util.concurrent.RejectedExecutionException since the number of tasks exceeds the size of the work queue. However, the desired behavior I am looking for is to have the main thread block until there is room in the queue. What is the best way to accomplish this?

  • skelly
    skelly over 9 years
    A very well thought-out response. I take minor issue with your condition that > "You have to (or want to) have a very small queue length." You may not be able to predict how many tasks that a given job will queue. Maybe you're running a daily job that processes data from some DB and on Monday there's 500 records to process but Tuesday there are 50,000. You've gotta set an upper bound on your queue such that you won't blow your heap when a big job comes through. In that case there's no harm in waiting for some tasks to complete before queueing more.
  • Tim Pote
    Tim Pote over 8 years
    "It's prone to deadlock because all the threads in the pool may die before the thing you put in the queue is visible. Mitigate this by setting a reasonable keep alive time." Can't deadlock be avoided completely by setting the min pool size to a value greater than zero? Every other reason is fallout of Java not having built-in support for blocking puts to executor queues. Which is interesting, because it seems to be a pretty reasonable strategy. I wonder what the rationale is.
  • Adrian Baker
    Adrian Baker about 6 years
    Perhaps another condition for a blocking strategy is when order of execution is important. CallerRunsPolicy will mean that the rejected task will likely be executed before other pending items in the executor.
  • asgs
    asgs over 5 years
    Thread 1 is already blocked at time T2 when it finds the queue to be empty. I'm not sure I understood your point on main thread blocking that thread.
  • devkaoru
    devkaoru over 5 years
    @asgs "Thread 1 is already blocked at time T2 when it finds the queue to be empty." Right, and because it's the main thread's responsibility to put work into the queue, you can deduce that the main thread is blocking Thread 1.
  • Bastian Voigt
    Bastian Voigt about 4 years
    Nice and smooth. Thank you!
  • madcolonel10
    madcolonel10 almost 4 years
    @TimPote the current implementation of execute() as of java 8 takes care of that condition also. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. Darren, do you see any issues with this approach with java 8 also ?
  • Adrian
    Adrian about 2 years
    This doesn't take into account the void execute(Runnable command) method which will go to the queue unblocked.