CompletableFuture with timeout

11,464

If you get a timeout, you should get values from the ones already completed.

Can be something like that:

public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
  CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
  try {
    allFuturesResult.get(timeout, unit);
  } catch (Exception e) {
    // you may log it
  }
  return futuresList
    .stream()
    .filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
    .map(CompletableFuture::join) // get the value from the completed future
    .collect(Collectors.<T>toList()); // collect as a list
}

Here is a complete working example, I just replace the doReq by sleep because I don't have your web service:

public class MainTest {

    private Instant start;

    public static void main(String[] args) {

        MainTest main = new MainTest();
        main.start();
    }

    public void start() {
        String req1 = "http://localhost:8080/testing";
        String req2 = "http://127.0.0.1:8095/testing2";

        ExecutorService exec = Executors.newCachedThreadPool();

        start = Instant.now();
        CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
        CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);

        List<CompletableFuture<String>> completables = List.of(comp1, comp2);

        System.out.println("Waiting completables");

        List<String> r = getAllCompleted(completables, 3, TimeUnit.SECONDS);
        Instant end = Instant.now();
        System.out.println(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));

        System.out.println(r.size());
        r.forEach(System.out::println);
        exec.shutdown();
    }

    public String doReq(String request) {
        if (request.contains("localhost")) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "response1";
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "response2";
    }

    public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
        CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
        try {
            allFuturesResult.get(timeout, unit);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return futuresList.stream()
            .filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
            .map(CompletableFuture::join) // get the value from the completed future
            .collect(Collectors.<T>toList()); // collect as a list
    }
}
Share:
11,464
Vml11
Author by

Vml11

Updated on June 04, 2022

Comments

  • Vml11
    Vml11 almost 2 years

    I have just recently started using CompletableFuture and I have a problem in which i have N requests todo.

    Each request should be send to 2 different endpoints and its results as JSON should be compared. Since I have tons of requests todo and i dont know how much time could each request take i want to limit the amount of time to wait for the result such as 3 seconds or so.

    So I wrote this testing code:

    public class MainTest {
    
       private static final Logger logger = LoggerFactory.getLogger(MainTest.class);
       private Instant start;
    
       public static void main(String[] args) {
    
           MainTest main = new MainTest();
           main.start();
       }
    
       public void start(){
           String req1 = "http://localhost:8080/testing";
           String req2 = "http://127.0.0.1:8095/testing2";
    
           ExecutorService exec = Executors.newCachedThreadPool();
    
           start = Instant.now();
           CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
           CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);
    
    
           List<CompletableFuture<String>> completables = List.of(comp1,comp2);
    
           logger.info("Waiting completables");
    
           CompletableFuture<List<String>> a = allOf(completables);
    
    
           List<String> r = new ArrayList<>();
           try {
               r = a.get(3, TimeUnit.SECONDS);
           } catch (InterruptedException e) {
               e.printStackTrace();
           } catch (ExecutionException e) {
               e.printStackTrace();
           } catch (TimeoutException e) {
               e.printStackTrace();
           }finally {
               Instant end = Instant.now();
               logger.info(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));
    
               System.out.println(r.size());
               r.forEach(System.out::println);
           }
           exec.shutdown();
       }
    
       public String doReq(String request){
           AtomicReference<String> response = new AtomicReference<>("default");
           try{
               logger.info("Sending request: {}", request);
               Unirest.get(request).asJson()
                       .ifSuccess(r -> {
                           response.set(r.getBody().toString());
                       })
                       .ifFailure(r -> {
                           logger.error("Oh No! Status" + r.getStatus());
                           r.getParsingError().ifPresent(e -> {
                               logger.error("Parsing Exception: ", e);
                               logger.error("Original body: " + e.getOriginalBody());
                           });
                       });
           } catch (Exception e) {
               logger.error("Error on request! {}", e.getMessage());
    
           }
          return response.get();
       }
    
    
       public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
           CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
           return allFuturesResult.thenApply(v ->
                           futuresList.stream().
                           map(CompletableFuture::join).
                           collect(Collectors.<T>toList())
           );
       }
    }
    

    Problem comes when either one of the requests takes more than 3 seconds... I want to have the result of the ones that had time to get it... I put on my web a delay on purpose in one of my requests of 7 seconds and i get the following output: One of them had time but its result is not in the List...

    2020-12-09T17:05:03,878 [pool-2-thread-2] INFO (MainTest:85) - Sending request: http://127.0.0.1:8095/testing2
    2020-12-09T17:05:03,878 [pool-2-thread-1] INFO (MainTest:85) - Sending request: http://localhost:8080/testing
    2020-12-09T17:05:03,878 [main] INFO (MainTest:53) - Waiting completables
    java.util.concurrent.TimeoutException
        at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
        at me.testing.MainTest.start(MainTest.java:60)
        at me.testing.MainTest.main(MainTest.java:31)
    2020-12-09T17:05:06,889 [main] INFO (MainTest:69) -  Took: 00:00:03.009
    0
    
  • Vml11
    Vml11 over 3 years
    hi @Zinc the IDE is forcing me to make the method return either an Object or Void
  • Zinc
    Zinc over 3 years
    Hi, I edited my answer with a complete example. Is it ok ?
  • Vml11
    Vml11 over 3 years
    Hi @Zinc thats perfect! Thank you so much!