How to pause/resume all threads in an ExecutorService in Java?
Solution 1
To answer my own question, I found an example of a PausableThreadPoolExecutor
in the javadocs of ThreadPoolExecutor
itself. Here is my version using Guava's Monitors:
import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
public class PausableExecutor extends ScheduledThreadPoolExecutor {
private boolean isPaused;
private final Monitor monitor = new Monitor();
private final Monitor.Guard paused = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return isPaused;
}
};
private final Monitor.Guard notPaused = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return !isPaused;
}
};
public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
monitor.enterWhenUninterruptibly(notPaused);
try {
monitor.waitForUninterruptibly(notPaused);
} finally {
monitor.leave();
}
}
public void pause() {
monitor.enterIf(notPaused);
try {
isPaused = true;
} finally {
monitor.leave();
}
}
public void resume() {
monitor.enterIf(paused);
try {
isPaused = false;
} finally {
monitor.leave();
}
}
}
Solution 2
I made some criticisms on your accepted answer, but they weren't very constructive... So here's my solution. I would use a class like this one and then call checkIn
wherever/whenever I want pause functionality. Find it on GitHub!
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Provides a mechanism to pause multiple threads.
* If wish your thread to participate, then it must regularly check in with an instance of this object.
*
* @author Corin Lawson <[email protected]>
*/
public class Continue {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public void checkIn() throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} finally {
pauseLock.unlock();
}
}
}
public void checkInUntil(Date deadline) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitUntil(deadline);
} finally {
pauseLock.unlock();
}
}
}
public void checkIn(long nanosTimeout) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitNanos(nanosTimeout);
} finally {
pauseLock.unlock();
}
}
}
public void checkIn(long time, TimeUnit unit) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await(time, unit);
} finally {
pauseLock.unlock();
}
}
}
public void checkInUninterruptibly() {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitUninterruptibly();
} finally {
pauseLock.unlock();
}
}
}
public boolean isPaused() {
return isPaused;
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
if (isPaused) {
isPaused = false;
unpaused.signalAll();
}
} finally {
pauseLock.unlock();
}
}
}
For example:
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
public class PausableExecutor extends ScheduledThreadPoolExecutor {
private Continue cont;
public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
super(corePoolSize, threadFactory);
cont = c;
}
protected void beforeExecute(Thread t, Runnable r) {
cont.checkIn();
super.beforeExecute(t, r);
}
}
This has the added benefit that you can pause many threads with a single call to Continue
's pause
.
Solution 3
I was looking for pause/resume functionality in executor, but with additional ability to await for any currently being processed tasks. Below is variant of other great implementations from this SO with addition of await functions. I was testing it on executor with single thread. So basic usage is:
executor.pause();
executor.await(10000); // blocks till current tasks processing ends
class code:
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
private Latch activeTasksLatch = new Latch();
private class Latch {
private final Object synchObj = new Object();
private int count;
public boolean awaitZero(long waitMS) throws InterruptedException {
long startTime = System.currentTimeMillis();
synchronized (synchObj) {
while (count > 0) {
if ( waitMS != 0) {
synchObj.wait(waitMS);
long curTime = System.currentTimeMillis();
if ( (curTime - startTime) > waitMS ) {
return count <= 0;
}
}
else
synchObj.wait();
}
return count <= 0;
}
}
public void countDown() {
synchronized (synchObj) {
if (--count <= 0) {
// assert count >= 0;
synchObj.notifyAll();
}
}
}
public void countUp() {
synchronized (synchObj) {
count++;
}
}
}
/**
* Default constructor for a simple fixed threadpool
*/
public PausableScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
/**
* Executed before a task is assigned to a thread.
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
activeTasksLatch.countUp();
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
super.afterExecute(r, t);
}
finally {
activeTasksLatch.countDown();
}
}
/**
* Pause the threadpool. Running tasks will continue running, but new tasks
* will not start untill the threadpool is resumed.
*/
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
/**
* Wait for all active tasks to end.
*/
public boolean await(long timeoutMS) {
// assert isPaused;
try {
return activeTasksLatch.awaitZero(timeoutMS);
} catch (InterruptedException e) {
// log e, or rethrow maybe
}
return false;
}
/**
* Resume the threadpool.
*/
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
Solution 4
The problem is that the Runnable/Callable themselves need to check when to pause/resume. That being said there and many ways to do this, and it depends on your requirements on how best to do this. Whatever your solution you need to make the waiting interruptable, so the thread can be shutdown cleanly.
Related videos on Youtube
pathikrit
Experienced in developing scalable solutions for complex problems. I enjoy working full-stack - from architecting schema and data-flows, implementing algorithms, designing APIs to crafting innovative UIs. My professional interests include algorithms, functional programming, finance, data analytics and visualization.
Updated on June 19, 2020Comments
-
pathikrit almost 4 years
I submitted bunch of jobs to an executorservice in Java and I somehow want to temporarily pause all these jobs. What's the best way to do this? How can I resume? Or am I doing this completely wrong? Should I follow some other pattern for what I want to achieve (i.e. ability to pause/resume execution services)?
-
Jon Skeet about 12 yearsDo you mean prevent new jobs from running, or pause the already running jobs?
-
pathikrit about 12 yearsPause already running jobs. The pause/resume might be called after
shutdown
-
Jon Skeet about 12 yearsIn that case, how you started the jobs is pretty much irrelevant. You'll need to code for pausing - for example, each task might want to check a "should I pause" flag periodically. It still won't be instantaneous, of course.
-
pathikrit about 12 yearsHmm, I guess I can create my own special kind of
Runnables
that can understand global pause/resume flag. I was hoping there was some cleaner way of doing it using either the list ofFutures
I have or through theExecutorService
itself
-
-
Corin about 11 yearsThere are some major differences with your solution and the example in the javadocs... (1) you've used two
Guard
s as opposed to the oneCondition
in the javadocs; (2) you've usedenterIf
outside of an if (which is plain wrong); (3)Monitor
'sleave
usessignal
notsignalAll
(which is what is really needed here); lastly (4) why wait onnotPaused
if you've already entered theMonitor
based onnotPaused
(just leave it)? All up, I don't think Monitor is a good choice here... -
pathikrit about 11 years1) I find Guava's Monitor/Guard cleaner abstraction than Condition. Just personal prefs here. 2)Do you mean outside try instead of outside if? I used the idiom documented in the Guava docs for Guard 3) Why signalAll? This Executor only pertains to only the threads that it contains and it doesn't matter for them if we use signal or signalAll 4) If you see the Monitor docs - docs.guava-libraries.googlecode.com/git/javadoc/com/google/… - Google themselves recommend to use to separate monitors even if one is the boolean opposite of the other.
-
Paul Taylor almost 11 yearsThankyou I just used your example to implement this functionality but I have a couple of comments, beforeExecute has to catch InterruptedException to compile. Wasn't clear you don't need to subclass ScheduledThreadPoolExecutor, you can just use ThreadPoolExecutor which is what I was using. PausableExcecutor will only pause execution of tasks that have been submitted but not started, to pause tasks already started you need to call checkIn in the task code itself, I used checkInInterruptably() for this but not sure if thats a good idea.
-
Adam Rabung over 10 yearsthanks for sharing - the first of many approaches i've tried that worked.
-
Barn over 10 yearsShould
boolean isPaused
be volatile? Or does theReentrantLock
act as a memory barrier? I am thinking of e.g. thread A callingpause()
orresume()
, thread B callingcheckIn()
, and thread C callingisPaused()
. -
Mike over 9 yearsThis looks good. Have you or anyone tested this more thoroughly? Are there any revisions or fixes? I'm going to use this now, because this one doesn't introduce yet another library.
-
marcinj over 9 yearsIts in use in quite large application, no issues so far. If there are any bugs in this code, I am also willing to hear
-
ProgrAmmar about 7 years@marcinj I'm trying your executor code. Works well for pause and resume. But I notice that when I call shutDownNow() on this when it is paused, it resumes and runs few tasks before it is actually shutdown. Any way to prevent that?
-
marcinj about 7 years@ProgrAmmar I tried to reproduce it with this code: melpon.org/wandbox/permlink/XHa9NwmI7n1WAr3F , but I failed - can you see if that is whats causing problem? To my understanding, "test 4" "test 5" "test 6" should be written to console. Those are outputs from tasks which should not get executed, but they are now written.
-
ProgrAmmar about 7 years@marcinj I couldn't get your link to work. So I created my own example here: pastebin.com/AY6r1zGD. I have created a FixedThreadPoolExecutor from your code. You can see that when you run it, some tasks are called after shutDownNow().
-
sssvrock about 6 yearsHi, I have used the same ThreadPoolExecutor and adding a Runnable Thread to it. But I am unable to pause and resume of Runnable threads. Can you please guide me how to achieve.
-
sssvrock about 6 yearsHi I have tried with @pathiikrit and Corin solutions to achieve pause and resume of Runnable threads using thread pool manager. But it is not at all working in my case.
-
robross0606 almost 6 yearsThis prevents scheduled jobs from executing, but it does not prevent paused jobs from building up in the queue. So, for example, if you have something scheduled at a fixed rate every second and then pause for five seconds, when you unpause, your runnable will fire five times.