CompletableFuture - Run multiple rest calls in parallel and get different result

17,042

Solution 1

AccountDetails accountDetails = new AccountDetails();

CompletableFuture.allOf(
                        CompletableFuture.
                                supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setMortgageAccountId(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setNoOfTrans(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setAddressLine(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setExternalLink(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                ).join();

Solution 2

If I simply a bit your account class as:

class Account {
  String fieldA;
  String fieldB;
  String fieldC;

  Account(String fieldA, String fieldB, String fieldC) {
    this.fieldA = fieldA;
    this.fieldB = fieldB;
    this.fieldC = fieldC;
  }
}

Then you can use CompletableFuture#allOf(...) to wait the results of all the completable future, one per field update, and then retrieve the result from those futures individually. We cannot use the result of allOf because it returns nothing (void).

Account account = CompletableFuture.allOf(cfA, cfB, cfC)
    .thenApply(ignored -> {
      String a = cfA.join();
      String b = cfB.join();
      String c = cfC.join();
      return new Account(a, b, c);
    }).join(); // or get(...) with timeout

We can use join in the thenApply because all the completable futures are done at this stage. You can modify the code block above to adapt your logic, such as updating fields instead of creating a new object. Note that the join() above can raise exception when the completable future is completed exceptionally. You can either change your completable future to handle() it correctly before submitting the future to allOf(...), or ask if it isCompletedExceptionally() before using the join():

CompletableFuture.allOf(cfA, cfB, cfC)
    .thenRun(() -> {
      if (!cfA.isCompletedExceptionally()) {
        account.fieldA = cfA.join();
      }
      if (!cfB.isCompletedExceptionally()) {
        account.fieldB = cfB.join();
      }
      if (!cfC.isCompletedExceptionally()) {
        account.fieldC = cfC.join();
      }
    }).join(); // or get(...) with timeout

The benefit of updating fields inside one completion stage is that these operations are done in the same thread so you don't have to worry about concurrent modification.

Solution 3

Since you have tagged spring-boot I supposed you use it and your services are written in spring framework. so then I provided an answer that is related to spring framework.

first of all, I created an interface for implementing rest API as async.

public interface AsyncRestCall<T> {
   /** this is a hypothetical method with hypothetical params!*/
   CompletableFuture<T> call(String bankAccountId); 
   String type();
}

then you can have implementation for your service such this:

As you see in this implementation I've used MortgageRest that it represents a rest service for Mortgage.

 @Service
 public class MortgageService implements AsyncRestCall<MortgageInfo> {

   private final MortgageRest mortgageRest;

   @Autowired
   public MortgageService(MortgageRest mortgageRest) {
       this.mortgageRest = mortgageRest;
   }

   @Override
   public CompletableFuture<MortgageInfo> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
    }

   @Override
   public String type() {
      return "mortgage";
   } 
} 

Mortgage Rest:

@Service
public class MortgageRest {
  private RestTemplate restTemplate;
  public MortgageRest(RestTemplate restTemplate) {
     this.restTemplate = restTemplate;
  }
  public MortgageInfo service(String bankAccountId) {
     return new MortgageInfo("123455" + bankAccountId);
  }
}

for other rest services do such this.

@Service
public class TransactionService implements AsyncRestCall<Transactions> {

   private final TransactionRest transactionRest;

   public TransactionService(TransactionRest transactionRest) {
      this.transactionRest = transactionRest;
   } 

   @Override
   public CompletableFuture<Transactions> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(transactionRest::service);
   }

   @Override
   public String type() {
       return "transactions";
   } 
} 

TransactionRest:

 @Service
 public class TransactionRest {

   public Transactions service() {
       return new Transactions(12);
   }
 }

now you need to have access all AsyncRestCall implementations. for this porpuse you can declare a class something like this:

@Service
public class RestCallHolder {

  private final List<AsyncRestCall> asyncRestCalls;

  public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
      this.asyncRestCalls = asyncRestCalls;
  }

  public List<AsyncRestCall> getAsyncRestCalls() {
      return asyncRestCalls;
  }
}

AccountDetailService(you can name what thing you like) uses CompleteableFuture to call rest services as parallel.

in this servie each bankAccountId rest calls will store in a Map<String, Map<String, Object>> result = new HashMap<>(); that outer map key will store bankAccountId value as key and its value is rest services calls that they will store in a map(inner map). key is type and value is rest call response. at the end by looping over accountDetails will update its properties.

@Service
public class AccountDetailService {

  private final RestCallHolder restCallHolder;

  public AccountDetailService(RestCallHolder restCallHolder) {
      this.restCallHolder = restCallHolder;
  }

  public List<AccountDetail> update(List<AccountDetail> accountDetails) {
     Map<String, Map<String, Object>> result = new HashMap<>();
     List<AccountDetail> finalAccountDetails = new ArrayList<>();

     accountDetails.forEach(accountDetail -> {
          List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
                    .stream()
                    .map(rest -> rest.call(accountDetail.getBankAccountId()))
                    .collect(Collectors.toList());

     CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                 .thenAccept(aVoid -> { 
                    Map<String, Object> res = restCallHolder.getAsyncRestCalls()
                              .stream()
                              .map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
                                  rest.call(accountDetail.getBankAccountId()).join()))
                              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                           result.put(accountDetail.getBankAccountId(), res);
                      }
                   ).handle((aVoid, throwable) -> {
                      return null; // handle the exception here 
             }).join();
            }
    );

      accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
             .bankAccountId(accountDetail.getBankAccountId())
             .mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
             .noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
             .build()));
     return finalAccountDetails;
   }
 }

Solution 4

I would give the responsibility to fetch the field values to the model object itself.
Here are three alternative solutions, using parallel streams, streams and an executor, and a for loop and an executor.

Solution 1:

accounts.parallelStream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(RestRequest::run);

Solution 2:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
accounts.stream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(executor::execute);

Solution 3:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
for (AccountDetails account : accounts) {
    execute(executor, account::updateMortgage);
    execute(executor, account::updateNoOfTrans);
    execute(executor, account::updateAddressLine);
    execute(executor, account::updateExternalLink);
}

private static void execute(Executor executor, Runnable task) {
    executor.execute(new RestRequest(task));
}

Common code:

class RestRequest implements Runnable {
    private final Runnable task;

    RestRequest(Runnable task) {
        this.task = task;
    }

    @Override
    public void run() {
        try {
            task.run();
        } catch (Exception e) {
            // A request failed. Others will not be canceled.
        }
    }
}

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;

    void fetchMortgage() {
        mortgageAccountId = MortgageService.getMortgage(bankAccountId).getAccountId();
    }

    void fetchNoOfTrans() {
        noOfTrans = TransactionService.getTransactions(bankAccountId).getAmount();
    }

    void fetchAddressLine() {
        addressLine = AddressService.getAddress(bankAccountId).getLine();
    }

    void fetchExternalLink() {
        externalLink = LinkService.getLinks(bankAccountId).getExternal();
    }
}
Share:
17,042
Kevin Rave
Author by

Kevin Rave

Updated on July 29, 2022

Comments

  • Kevin Rave
    Kevin Rave almost 2 years

    I have a rather common or unique requirement. For example, I have the following AccountDetails list:

    List<AccountDetails>

    class AccountDetails {
        String bankAccountId;
        String mortgageAccountId;
        Integer noOfTrans;
        String addressLine;
        String externalLink;   
    }
    

    All the above fields, except bankAccountId are pulled from external REST service call. I want to call all the REST services in parallel and update each object in the list:

    So, it looks like below:

    For each accountDetails

    • Call mortgage REST service and update martgageAccountIdfield (REST returns MortgageInfo object)
    • Call transaction REST service and update noOfTrans field (REST returns Transactions object)
    • Call address REST service and update addressLine field (REST returns Address object)
    • Call link REST service and update externalLink field. (REST returns Links object)

    I want all the above calls in parallel, and for each AcccountDetails object in the list. If there is an exception, I want do gracefully handle it. Note that each of the above REST service returns different custom object

    I am confused about how to achieve this with CompletableFuture chaining. Not sure allOf or thenCombine (which only takes two), or thenCompose should use and how to put all of these together.

    Any examples/ideas?