Java - Define a timeout for Callable within a ExecutorCompletionService

10,127

I suggest you to divide your problem into 2 separate ones:

  1. run on multiple threads
  2. use a timeout for each operation

For the first (multithreading), you already used the service executor that can manage that on 2 Threads : Executors.newFixedThreadPool(2). If you apply the timeout here, the timeout act for the run of all tasks, but you need a timeout for each job.

For the timout issue, you can manage it thanks to a new service executor per job in a class: JobManager.

package com.stackoverflow.q24473796;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JobManager implements Callable<Integer> {

protected long timeout;
protected TimeUnit timeUnit;
protected Callable<Integer> job;

public JobManager(long timeout, TimeUnit timeUnit, Callable<Integer> job) {
this.timeout = timeout;
this.timeUnit = timeUnit;
this.job = job;
}

@Override
public Integer call() {
    Integer result = new Integer(-1); // default, this could be adapted
    ExecutorService exec = Executors.newSingleThreadExecutor();

    try {
        result = exec.submit(job).get(timeout, timeUnit);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        // Whatever you want
        if (e instanceof TimeoutException) {
            System.out.println("Timeout get for " + job.toString());
        } else {
            System.out.println("exception get for " + job.toString() + " : " + e.getMessage());
        }

    }
    exec.shutdown();
    return result;
    }
}

Then, you can call the tasks from your main thread as following:

    Job job = new Job(i * 1000, i);
    Future<Integer> future = newFixedThreadPool.submit(new JobManager(5, TimeUnit.SECONDS, job));

I addapted your CallableTest: package com.stackoverflow.q24473796;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class CallableTest {

    public static void main(String[] args) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);

        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
        for (int i = 10; i > 0; i--) {
            Job job = new Job(i * 1000, i);
            Future<Integer> future = newFixedThreadPool.submit(new   JobManager(5, TimeUnit.SECONDS, job));
            futures.add(future);
        }

        ArrayList<Integer> results = new ArrayList<Integer>();
        for (Future<Integer> future : futures) {
            Integer result = new Integer(-1);
            try {
                result = future.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            if (result != -1) {
                results.add(result);
            }
        }

        newFixedThreadPool.shutdown();

        try {
            newFixedThreadPool.awaitTermination(60, TimeUnit.SECONDS); //Global Timeout
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(new Date() + " results:");
        for (int j : results) {
            System.out.println(new Date() + " " + j);
        }
    }
}

And you'll get the following output:

Wed Apr 29 10:51:02 CEST 2015 10 started
Wed Apr 29 10:51:02 CEST 2015 9 started
Timeout get for com.stackoverflow.q24473796.Job@249fe45c
Timeout get for com.stackoverflow.q24473796.Job@249fe45c
Wed Apr 29 10:51:07 CEST 2015 8 started
Wed Apr 29 10:51:07 CEST 2015 7 started
Wed Apr 29 10:51:11 CEST 2015 9 finished
Timeout get for com.stackoverflow.q24473796.Job@3cd4c5a0
Timeout get for com.stackoverflow.q24473796.Job@3cd4c5a0
Wed Apr 29 10:51:12 CEST 2015 6 started
Wed Apr 29 10:51:12 CEST 2015 5 started
Wed Apr 29 10:51:12 CEST 2015 10 finished
Wed Apr 29 10:51:14 CEST 2015 7 finished
Wed Apr 29 10:51:15 CEST 2015 8 finished
Wed Apr 29 10:51:17 CEST 2015 5 finished
Wed Apr 29 10:51:17 CEST 2015 4 started
Timeout get for com.stackoverflow.q24473796.Job@2a0fded2
Wed Apr 29 10:51:17 CEST 2015 3 started
Wed Apr 29 10:51:18 CEST 2015 6 finished
Wed Apr 29 10:51:20 CEST 2015 3 finished
Wed Apr 29 10:51:20 CEST 2015 2 started
Wed Apr 29 10:51:21 CEST 2015 4 finished
Wed Apr 29 10:51:21 CEST 2015 1 started
Wed Apr 29 10:51:22 CEST 2015 1 finished
Wed Apr 29 10:51:22 CEST 2015 2 finished
Wed Apr 29 10:51:22 CEST 2015 results:
Wed Apr 29 10:51:22 CEST 2015 5
Wed Apr 29 10:51:22 CEST 2015 4
Wed Apr 29 10:51:22 CEST 2015 3
Wed Apr 29 10:51:22 CEST 2015 2
Wed Apr 29 10:51:22 CEST 2015 1
Share:
10,127
user3787186
Author by

user3787186

Updated on June 04, 2022

Comments

  • user3787186
    user3787186 almost 2 years

    I've got the following problem using ExecutorCompletionService. I want to call a lot of Callable in different Threads. These Callable don't share any information with each other. I need to define a timeout for each Callable, eg. do not run longer than 5 seconds. Each Callable can run in a different time that I do not know when starting. After the timeout the Thread should be stopped/killed and the result is not interesting any more for me. The other 'normal' running Threads should not be infuenced.

    So lets take the following example with a simple callable and my current java code.

    import java.util.Date;
    import java.util.concurrent.Callable;
    
    public class Job implements Callable<Integer> {
    
        int returnValue = 0;
        long millis = 0;
    
        public Job(long millis, int value) {
            this.millis = millis;
            this.returnValue = value;
        }
    
        @Override
        public Integer call() throws Exception, InterruptedException {
            try {
                System.out.println(new Date() + " " + returnValue + " started");
                Thread.sleep(millis);
                System.out.println(new Date() + " " + returnValue + " finished");
                return returnValue;
            } catch (InterruptedException e) {
                System.out.println(new Date() + " " + returnValue + " interrupted");
                throw e;
            }        
        }
    }
    

    And the other Class where the Callable is used.

    import java.util.ArrayList;
    import java.util.Date;
    import java.util.concurrent.*;
    
    public class CallableTest {
    
        public static void main(String[] args) {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
            CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(newFixedThreadPool);
            
            for (int i = 10; i > 0; i--) {
                Job job = new Job(i * 1000, i);
                pool.submit(job);
            }
            
            ArrayList<Integer> results = new ArrayList<Integer>();
            for (int i = 1; i < 11; ++i) {
                try {
                    Future<Integer> future = pool.take();
                    Integer content = future.get(5, TimeUnit.SECONDS);
                    results.add(content);
                    System.out.println(new Date() + " added " + content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            newFixedThreadPool.shutdownNow();
    
            System.out.println(new Date() + " results:");
            for (int j : results) {
                System.out.println(new Date() + " " + j);
            }
        }
    }
    

    The ouput is something like:

    Sun Jun 29 08:01:00 CEST 2014 10 started
    Sun Jun 29 08:01:00 CEST 2014 9 started
    Sun Jun 29 08:01:09 CEST 2014 9 finished
    Sun Jun 29 08:01:09 CEST 2014 added 9
    Sun Jun 29 08:01:09 CEST 2014 8 started
    Sun Jun 29 08:01:10 CEST 2014 10 finished
    Sun Jun 29 08:01:10 CEST 2014 7 started
    Sun Jun 29 08:01:10 CEST 2014 added 10
    Sun Jun 29 08:01:17 CEST 2014 7 finished
    Sun Jun 29 08:01:17 CEST 2014 6 started
    Sun Jun 29 08:01:17 CEST 2014 added 7
    Sun Jun 29 08:01:17 CEST 2014 8 finished
    Sun Jun 29 08:01:17 CEST 2014 added 8
    Sun Jun 29 08:01:17 CEST 2014 5 started
    Sun Jun 29 08:01:22 CEST 2014 5 finished
    Sun Jun 29 08:01:22 CEST 2014 added 5
    Sun Jun 29 08:01:22 CEST 2014 4 started
    Sun Jun 29 08:01:23 CEST 2014 6 finished
    Sun Jun 29 08:01:23 CEST 2014 3 started
    Sun Jun 29 08:01:23 CEST 2014 added 6
    Sun Jun 29 08:01:26 CEST 2014 3 finished
    Sun Jun 29 08:01:26 CEST 2014 2 started
    Sun Jun 29 08:01:26 CEST 2014 added 3
    Sun Jun 29 08:01:26 CEST 2014 4 finished
    Sun Jun 29 08:01:26 CEST 2014 1 started
    Sun Jun 29 08:01:26 CEST 2014 added 4
    Sun Jun 29 08:01:27 CEST 2014 1 finished
    Sun Jun 29 08:01:27 CEST 2014 added 1
    Sun Jun 29 08:01:28 CEST 2014 2 finished
    Sun Jun 29 08:01:28 CEST 2014 added 2
    Sun Jun 29 08:01:28 CEST 2014 results:
    Sun Jun 29 08:01:28 CEST 2014 9
    Sun Jun 29 08:01:28 CEST 2014 10
    Sun Jun 29 08:01:28 CEST 2014 7
    Sun Jun 29 08:01:28 CEST 2014 8
    Sun Jun 29 08:01:28 CEST 2014 5
    Sun Jun 29 08:01:28 CEST 2014 6
    Sun Jun 29 08:01:28 CEST 2014 3
    Sun Jun 29 08:01:28 CEST 2014 4
    Sun Jun 29 08:01:28 CEST 2014 1
    Sun Jun 29 08:01:28 CEST 2014 2 
    

    That does not work like I would like to have it. I want that each Callable running longer than 5 seconds should be terminated/ended/interruped and only the Callable running lower than 5 seconds give me a valid result.

    I also tried it without the ExecutorCompletionService

    public class CallableTest2 {
        public static void main(String[] args) {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
            List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
            
            for (int i = 10; i > 0; i--) {
                Job job = new Job(i * 1000, i);
                futures.add(newFixedThreadPool.submit(job));
            }
            
            ArrayList<Integer> results = new ArrayList<Integer>();
            for (Future<Integer> future: futures) {
                try {
                    Integer content = future.get(5, TimeUnit.SECONDS);
                    results.add(content);
                    System.out.println(new Date() + " added " + content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            newFixedThreadPool.shutdownNow();
    
            System.out.println(new Date() + " results:");
            for (int j : results) {
                System.out.println(new Date() + " " + j);
            }
        }
    }
    

    With the results:

    Sun Jun 29 08:33:19 CEST 2014 9 started
    Sun Jun 29 08:33:19 CEST 2014 10 started
    java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
        at callabletest.CallableTest2.main(CallableTest2.java:29)
    Sun Jun 29 08:33:28 CEST 2014 9 finished
    Sun Jun 29 08:33:28 CEST 2014 8 started
    Sun Jun 29 08:33:28 CEST 2014 added 9
    Sun Jun 29 08:33:29 CEST 2014 10 finished
    Sun Jun 29 08:33:29 CEST 2014 7 started
    java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
        at callabletest.CallableTest2.main(CallableTest2.java:29)
    Sun Jun 29 08:33:36 CEST 2014 7 finished
    Sun Jun 29 08:33:36 CEST 2014 added 7
    Sun Jun 29 08:33:36 CEST 2014 6 started
    Sun Jun 29 08:33:36 CEST 2014 8 finished
    Sun Jun 29 08:33:36 CEST 2014 5 started
    java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(Sun Jun 29 08:33:41 CEST 2014 5 finished
    FutureTask.java:228)
    Sun Jun 29 08:33:41 CEST 2014 added 5
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    Sun Jun 29 08:33:41 CEST 2014 4 started
        at callabletest.CallableTest2.main(CallableTest2.java:29)
    Sun Jun 29 08:33:42 CEST 2014 6 finished
    Sun Jun 29 08:33:42 CEST 2014 3 started
    Sun Jun 29 08:33:45 CEST 2014 3 finished
    Sun Jun 29 08:33:45 CEST 2014 2 started
    Sun Jun 29 08:33:45 CEST 2014 4 finished
    Sun Jun 29 08:33:45 CEST 2014 added 4
    Sun Jun 29 08:33:45 CEST 2014 added 3
    Sun Jun 29 08:33:45 CEST 2014 1 started
    Sun Jun 29 08:33:46 CEST 2014 1 finished
    Sun Jun 29 08:33:47 CEST 2014 2 finished
    Sun Jun 29 08:33:47 CEST 2014 added 2
    Sun Jun 29 08:33:47 CEST 2014 added 1
    Sun Jun 29 08:33:47 CEST 2014 results:
    Sun Jun 29 08:33:47 CEST 2014 9
    Sun Jun 29 08:33:47 CEST 2014 7
    Sun Jun 29 08:33:47 CEST 2014 5
    Sun Jun 29 08:33:47 CEST 2014 4
    Sun Jun 29 08:33:47 CEST 2014 3
    Sun Jun 29 08:33:47 CEST 2014 2
    Sun Jun 29 08:33:47 CEST 2014 1
    

    Now I get some TimeoutExceptions, but also not where I expect them. Eg. The Callable running 9 and 7 seconds does not throw an Exception!

    What do I have to change in the code, to only get the results of the short running Threads and kill the long running ones. In the example only the results 1-5 without 6-10.

    I've tested a lot of things but I can't get it to work. Please help


    This is an answer to the post of bstar55 using a ScheduledExecutorService.

    I changed my code regarding to your hint to:

    public class CallableTest3 {
    
        public static void main(String[] args) {
            ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
            List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
            
            for (int i = 10; i > 0; i--) {
                Job job = new Job(i * 1000, i);
                final Future handler = executor.submit(job);
                final int x = i;
                executor.schedule(new Runnable() {
    
                    public void run() {
                        boolean cancel = handler.cancel(true);
                        if(cancel){
                            System.out.println(new Date() + " job " + x + " cancelled");
                        }else{
                            System.out.println(new Date() + " job " + x + " not cancelled");
                        }
                    }
                }, 5000, TimeUnit.MILLISECONDS);
                futures.add(handler);
            }
    
            ArrayList<Integer> results = new ArrayList<Integer>();
            for (Future<Integer> future : futures) {
                try {
                    Integer content = future.get(5, TimeUnit.SECONDS);
                    results.add(content);
                    System.out.println(new Date() + " added " + content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            executor.shutdown();
    
            System.out.println(new Date() + " results:");
            for (int j : results) {
                System.out.println(new Date() + " --- " + j);
            }
        }
    }
    

    But this also does not work as expected. Result:

    Sun Jun 29 10:27:41 CEST 2014 9 started
    Sun Jun 29 10:27:41 CEST 2014 10 started
    java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
        at callabletest.CallableTest3.main(CallableTest3.java:43)
    Sun Jun 29 10:27:50 CEST 2014 9 finished
    Sun Jun 29 10:27:50 CEST 2014 added 9
    Sun Jun 29 10:27:50 CEST 2014 8 started
    Sun Jun 29 10:27:51 CEST 2014 10 finished
    Sun Jun 29 10:27:51 CEST 2014 7 started
    java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
        at callabletest.CallableTest3.main(CallableTest3.java:43)
    Sun Jun 29 10:27:58 CEST 2014 8 finished
    Sun Jun 29 10:27:58 CEST 2014 6 started
    Sun Jun 29 10:27:58 CEST 2014 7 finished
    Sun Jun 29 10:27:58 CEST 2014 5 started
    Sun Jun 29 10:27:58 CEST 2014 added 7
    Sun Jun 29 10:28:03 CEST 2014 5 finished
    Sun Jun 29 10:28:03 CEST 2014 4 started
    java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    Sun Jun 29 10:28:03 CEST 2014 added 5
        at callabletest.CallableTest3.main(CallableTest3.java:43)
    Sun Jun 29 10:28:04 CEST 2014 6 finished
    Sun Jun 29 10:28:04 CEST 2014 3 started
    Sun Jun 29 10:28:07 CEST 2014 3 finished
    Sun Jun 29 10:28:07 CEST 2014 2 started
    Sun Jun 29 10:28:07 CEST 2014 4 finished
    Sun Jun 29 10:28:07 CEST 2014 added 4
    Sun Jun 29 10:28:07 CEST 2014 added 3
    Sun Jun 29 10:28:07 CEST 2014 1 started
    java.util.concurrent.CancellationException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:230)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
        at callabletest.CallableTest3.main(CallableTest3.java:43)
    Sun Jun 29 10:28:08 CEST 2014 1 finished
    Sun Jun 29 10:28:08 CEST 2014 job 10 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 job 9 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 job 8 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 job 7 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 job 6 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 job 5 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 job 4 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 job 3 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 2 interrupted
    Sun Jun 29 10:28:08 CEST 2014 job 1 not cancelled
    Sun Jun 29 10:28:08 CEST 2014 added 1
    Sun Jun 29 10:28:08 CEST 2014 results:
    Sun Jun 29 10:28:08 CEST 2014 --- 9
    Sun Jun 29 10:28:08 CEST 2014 --- 7
    Sun Jun 29 10:28:08 CEST 2014 --- 5
    Sun Jun 29 10:28:08 CEST 2014 --- 4
    Sun Jun 29 10:28:08 CEST 2014 --- 3
    Sun Jun 29 10:28:08 CEST 2014 --- 1
    Sun Jun 29 10:28:08 CEST 2014 job 2 cancelled
    

    But instead the job 2 was cancelled!


  • user3787186
    user3787186 almost 10 years
    Yes I also thought about looking for the time within the callable. The problem is, that in the real application there is a big code-block instead of the simple Thread.sleep(x); I do not know how to check this codeblock every second without creating a new thread for each thread!
  • Warren Dew
    Warren Dew almost 10 years
    Do you want the threads to stop after 5 seconds to save processing time, or is it okay if they continue operating and you just have the option to ignore the result if they have operated longer than 5 seconds without completing?
  • user3787186
    user3787186 almost 10 years
    The threads should be stopped. In the real application the Callable run sometimes into an endless loops. So I also need to kill these threads after my timeout. The real timeout is about half an hour, so its not a realtime problem or so. The result of this killed callable is not interesting anymore for me. But it would also be interesting, which Callable was killed.
  • user3787186
    user3787186 almost 10 years
    I think the idea is good. How can I save the starting time within the Callable, so I can use it in the calling main thread for compairing? But it will not work for the ExecutorCompletionService, because the returned Future objects are not in the same order as submitted to the pool. So I don't know what Future object belongs to what Callable for comparing the timings.
  • bstar55
    bstar55 almost 10 years
    Yeah, I also don't see a way to get the Job associated with the Future. I posted another answer that might prove more useful.
  • user3787186
    user3787186 almost 10 years
    Damn, due to my low reputation I can't answer to my own post within 8 hours here. So I will add my changes to my original post.
  • Warren Dew
    Warren Dew almost 10 years
    There is a way to do it, but it is highly dangerous. There's also an alternative using processes instead of threads. See the edits to my answer for more detail.