How to pause/resume all threads in an ExecutorService in Java?

26,121

Solution 1

To answer my own question, I found an example of a PausableThreadPoolExecutor in the javadocs of ThreadPoolExecutor itself. Here is my version using Guava's Monitors:

import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor {

    private boolean isPaused;

    private final Monitor monitor = new Monitor();
    private final Monitor.Guard paused = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return isPaused;
        }
    };

    private final Monitor.Guard notPaused = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return !isPaused;
        }
    };

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        monitor.enterWhenUninterruptibly(notPaused);
        try {
            monitor.waitForUninterruptibly(notPaused);
        } finally {
            monitor.leave();
        }
    }

    public void pause() {
        monitor.enterIf(notPaused);
        try {
            isPaused = true;
        } finally {
            monitor.leave();
        }
    }

    public void resume() {
        monitor.enterIf(paused);
        try {
            isPaused = false;
        } finally {
            monitor.leave();
        }
    }
}

Solution 2

I made some criticisms on your accepted answer, but they weren't very constructive... So here's my solution. I would use a class like this one and then call checkIn wherever/whenever I want pause functionality. Find it on GitHub!

import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Provides a mechanism to pause multiple threads.
 * If wish your thread to participate, then it must regularly check in with an instance of this object.
 * 
 * @author Corin Lawson <[email protected]>
 */
public class Continue {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public void checkIn() throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.await();
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkInUntil(Date deadline) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitUntil(deadline);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkIn(long nanosTimeout) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitNanos(nanosTimeout);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkIn(long time, TimeUnit unit) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.await(time, unit);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkInUninterruptibly() {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitUninterruptibly();
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public boolean isPaused() {
        return isPaused;
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            if (isPaused) {
                isPaused = false;
                unpaused.signalAll();
            }
        } finally {
            pauseLock.unlock();
        }
    }
}

For example:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor {
    private Continue cont;

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
        super(corePoolSize, threadFactory);
        cont = c;
    }

    protected void beforeExecute(Thread t, Runnable r) {
        cont.checkIn();
        super.beforeExecute(t, r);
    }
}

This has the added benefit that you can pause many threads with a single call to Continue's pause.

Solution 3

I was looking for pause/resume functionality in executor, but with additional ability to await for any currently being processed tasks. Below is variant of other great implementations from this SO with addition of await functions. I was testing it on executor with single thread. So basic usage is:

executor.pause();
executor.await(10000); // blocks till current tasks processing ends

class code:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {      
  public boolean isPaused;
  private ReentrantLock pauseLock = new ReentrantLock();
  private Condition unpaused = pauseLock.newCondition();
  private Latch activeTasksLatch = new Latch();

  private class Latch {
    private final Object synchObj = new Object();
    private int count;

    public boolean awaitZero(long waitMS) throws InterruptedException {
      long startTime = System.currentTimeMillis();
      synchronized (synchObj) {
        while (count > 0) {
          if ( waitMS != 0) {
            synchObj.wait(waitMS);
            long curTime = System.currentTimeMillis();
            if ( (curTime - startTime) > waitMS ) {                
              return count <= 0;
            }
          }
          else
            synchObj.wait();
        }
        return count <= 0; 
      }
    }
    public void countDown() {
      synchronized (synchObj) {
        if (--count <= 0) {
          // assert count >= 0;              
          synchObj.notifyAll();
        }
      }
    }
    public void countUp() {
      synchronized (synchObj) {
        count++;
      }
    }    
  }

  /**
   * Default constructor for a simple fixed threadpool
   */
  public PausableScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize);
  }

  /**
   * Executed before a task is assigned to a thread.
   */
  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    pauseLock.lock();
    try {
      while (isPaused)
        unpaused.await();
    } catch (InterruptedException ie) {
      t.interrupt();
    } finally {
      pauseLock.unlock();
    }

    activeTasksLatch.countUp();
    super.beforeExecute(t, r);
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    try {
      super.afterExecute(r, t);
    }
    finally {
      activeTasksLatch.countDown();
    }
  }

  /**
   * Pause the threadpool. Running tasks will continue running, but new tasks
   * will not start untill the threadpool is resumed.
   */
  public void pause() {
    pauseLock.lock();
    try {
      isPaused = true;
    } finally {
      pauseLock.unlock();
    }
  }

  /**
   * Wait for all active tasks to end.
   */ 
  public boolean await(long timeoutMS) {
    // assert isPaused;
    try {
      return activeTasksLatch.awaitZero(timeoutMS);
    } catch (InterruptedException e) {
      // log e, or rethrow maybe
    }
    return false;
  }

  /**
   * Resume the threadpool.
   */
  public void resume() {
    pauseLock.lock();
    try {
      isPaused = false;
      unpaused.signalAll();
    } finally {
      pauseLock.unlock();
    }
  }

}

Solution 4

The problem is that the Runnable/Callable themselves need to check when to pause/resume. That being said there and many ways to do this, and it depends on your requirements on how best to do this. Whatever your solution you need to make the waiting interruptable, so the thread can be shutdown cleanly.

Share:
26,121

Related videos on Youtube

pathikrit
Author by

pathikrit

Experienced in developing scalable solutions for complex problems. I enjoy working full-stack - from architecting schema and data-flows, implementing algorithms, designing APIs to crafting innovative UIs. My professional interests include algorithms, functional programming, finance, data analytics and visualization.

Updated on June 19, 2020

Comments

  • pathikrit
    pathikrit almost 4 years

    I submitted bunch of jobs to an executorservice in Java and I somehow want to temporarily pause all these jobs. What's the best way to do this? How can I resume? Or am I doing this completely wrong? Should I follow some other pattern for what I want to achieve (i.e. ability to pause/resume execution services)?

    • Jon Skeet
      Jon Skeet about 12 years
      Do you mean prevent new jobs from running, or pause the already running jobs?
    • pathikrit
      pathikrit about 12 years
      Pause already running jobs. The pause/resume might be called after shutdown
    • Jon Skeet
      Jon Skeet about 12 years
      In that case, how you started the jobs is pretty much irrelevant. You'll need to code for pausing - for example, each task might want to check a "should I pause" flag periodically. It still won't be instantaneous, of course.
    • pathikrit
      pathikrit about 12 years
      Hmm, I guess I can create my own special kind of Runnables that can understand global pause/resume flag. I was hoping there was some cleaner way of doing it using either the list of Futures I have or through the ExecutorService itself
  • Corin
    Corin about 11 years
    There are some major differences with your solution and the example in the javadocs... (1) you've used two Guards as opposed to the one Condition in the javadocs; (2) you've used enterIf outside of an if (which is plain wrong); (3) Monitor's leave uses signal not signalAll (which is what is really needed here); lastly (4) why wait on notPaused if you've already entered the Monitor based on notPaused (just leave it)? All up, I don't think Monitor is a good choice here...
  • pathikrit
    pathikrit about 11 years
    1) I find Guava's Monitor/Guard cleaner abstraction than Condition. Just personal prefs here. 2)Do you mean outside try instead of outside if? I used the idiom documented in the Guava docs for Guard 3) Why signalAll? This Executor only pertains to only the threads that it contains and it doesn't matter for them if we use signal or signalAll 4) If you see the Monitor docs - docs.guava-libraries.googlecode.com/git/javadoc/com/google/… - Google themselves recommend to use to separate monitors even if one is the boolean opposite of the other.
  • Paul Taylor
    Paul Taylor almost 11 years
    Thankyou I just used your example to implement this functionality but I have a couple of comments, beforeExecute has to catch InterruptedException to compile. Wasn't clear you don't need to subclass ScheduledThreadPoolExecutor, you can just use ThreadPoolExecutor which is what I was using. PausableExcecutor will only pause execution of tasks that have been submitted but not started, to pause tasks already started you need to call checkIn in the task code itself, I used checkInInterruptably() for this but not sure if thats a good idea.
  • Adam Rabung
    Adam Rabung over 10 years
    thanks for sharing - the first of many approaches i've tried that worked.
  • Barn
    Barn over 10 years
    Should boolean isPaused be volatile? Or does the ReentrantLock act as a memory barrier? I am thinking of e.g. thread A calling pause() or resume(), thread B calling checkIn(), and thread C calling isPaused().
  • Mike
    Mike over 9 years
    This looks good. Have you or anyone tested this more thoroughly? Are there any revisions or fixes? I'm going to use this now, because this one doesn't introduce yet another library.
  • marcinj
    marcinj over 9 years
    Its in use in quite large application, no issues so far. If there are any bugs in this code, I am also willing to hear
  • ProgrAmmar
    ProgrAmmar about 7 years
    @marcinj I'm trying your executor code. Works well for pause and resume. But I notice that when I call shutDownNow() on this when it is paused, it resumes and runs few tasks before it is actually shutdown. Any way to prevent that?
  • marcinj
    marcinj about 7 years
    @ProgrAmmar I tried to reproduce it with this code: melpon.org/wandbox/permlink/XHa9NwmI7n1WAr3F , but I failed - can you see if that is whats causing problem? To my understanding, "test 4" "test 5" "test 6" should be written to console. Those are outputs from tasks which should not get executed, but they are now written.
  • ProgrAmmar
    ProgrAmmar about 7 years
    @marcinj I couldn't get your link to work. So I created my own example here: pastebin.com/AY6r1zGD. I have created a FixedThreadPoolExecutor from your code. You can see that when you run it, some tasks are called after shutDownNow().
  • sssvrock
    sssvrock about 6 years
    Hi, I have used the same ThreadPoolExecutor and adding a Runnable Thread to it. But I am unable to pause and resume of Runnable threads. Can you please guide me how to achieve.
  • sssvrock
    sssvrock about 6 years
    Hi I have tried with @pathiikrit and Corin solutions to achieve pause and resume of Runnable threads using thread pool manager. But it is not at all working in my case.
  • robross0606
    robross0606 almost 6 years
    This prevents scheduled jobs from executing, but it does not prevent paused jobs from building up in the queue. So, for example, if you have something scheduled at a fixed rate every second and then pause for five seconds, when you unpause, your runnable will fire five times.