CompletableFuture - Run multiple rest calls in parallel and get different result
Solution 1
AccountDetails accountDetails = new AccountDetails();
CompletableFuture.allOf(
CompletableFuture.
supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
thenAccept(x -> {
accountDetails.setMortgageAccountId(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME OTHER REST, executor).
thenAccept(x -> {
accountDetails.setNoOfTrans(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME INFO REST, executor).
thenAccept(x -> {
accountDetails.setAddressLine(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
CompletableFuture.
supplyAsync(() -> //CALL SOME OTHER REST, executor).
thenAccept(x -> {
accountDetails.setExternalLink(x.getReqdField())
}).
handle(//HANDLE GRACEFULLY),
).join();
Solution 2
If I simply a bit your account class as:
class Account {
String fieldA;
String fieldB;
String fieldC;
Account(String fieldA, String fieldB, String fieldC) {
this.fieldA = fieldA;
this.fieldB = fieldB;
this.fieldC = fieldC;
}
}
Then you can use CompletableFuture#allOf(...)
to wait the results of all the completable future, one per field update, and then retrieve the result from those futures individually. We cannot use the result of allOf
because it returns nothing (void).
Account account = CompletableFuture.allOf(cfA, cfB, cfC)
.thenApply(ignored -> {
String a = cfA.join();
String b = cfB.join();
String c = cfC.join();
return new Account(a, b, c);
}).join(); // or get(...) with timeout
We can use join in the thenApply
because all the completable futures are done at this stage. You can modify the code block above to adapt your logic, such as updating fields instead of creating a new object. Note that the join()
above can raise exception when the completable future is completed exceptionally. You can either change your completable future to handle()
it correctly before submitting the future to allOf(...)
, or ask if it isCompletedExceptionally()
before using the join()
:
CompletableFuture.allOf(cfA, cfB, cfC)
.thenRun(() -> {
if (!cfA.isCompletedExceptionally()) {
account.fieldA = cfA.join();
}
if (!cfB.isCompletedExceptionally()) {
account.fieldB = cfB.join();
}
if (!cfC.isCompletedExceptionally()) {
account.fieldC = cfC.join();
}
}).join(); // or get(...) with timeout
The benefit of updating fields inside one completion stage is that these operations are done in the same thread so you don't have to worry about concurrent modification.
Solution 3
Since you have tagged spring-boot
I supposed you use it and your services are written in spring framework. so then I provided an answer that is related to spring framework.
first of all, I created an interface for implementing rest API as async.
public interface AsyncRestCall<T> {
/** this is a hypothetical method with hypothetical params!*/
CompletableFuture<T> call(String bankAccountId);
String type();
}
then you can have implementation for your service such this:
As you see in this implementation I've used MortgageRest
that it represents a rest service for Mortgage
.
@Service
public class MortgageService implements AsyncRestCall<MortgageInfo> {
private final MortgageRest mortgageRest;
@Autowired
public MortgageService(MortgageRest mortgageRest) {
this.mortgageRest = mortgageRest;
}
@Override
public CompletableFuture<MortgageInfo> call(String bankAccountId) {
return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
}
@Override
public String type() {
return "mortgage";
}
}
Mortgage Rest:
@Service
public class MortgageRest {
private RestTemplate restTemplate;
public MortgageRest(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
public MortgageInfo service(String bankAccountId) {
return new MortgageInfo("123455" + bankAccountId);
}
}
for other rest services do such this.
@Service
public class TransactionService implements AsyncRestCall<Transactions> {
private final TransactionRest transactionRest;
public TransactionService(TransactionRest transactionRest) {
this.transactionRest = transactionRest;
}
@Override
public CompletableFuture<Transactions> call(String bankAccountId) {
return CompletableFuture.supplyAsync(transactionRest::service);
}
@Override
public String type() {
return "transactions";
}
}
TransactionRest:
@Service
public class TransactionRest {
public Transactions service() {
return new Transactions(12);
}
}
now you need to have access all AsyncRestCall
implementations. for this porpuse you can declare a class something like this:
@Service
public class RestCallHolder {
private final List<AsyncRestCall> asyncRestCalls;
public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
this.asyncRestCalls = asyncRestCalls;
}
public List<AsyncRestCall> getAsyncRestCalls() {
return asyncRestCalls;
}
}
AccountDetailService
(you can name what thing you like) uses CompleteableFuture
to call rest services as parallel.
in this servie each bankAccountId
rest calls will store in a Map<String, Map<String, Object>> result = new HashMap<>();
that outer map key will store bankAccountId
value as key and its value is rest services calls that they will store in a map(inner map). key is type and value is rest call response. at the end by looping over accountDetails will update its properties.
@Service
public class AccountDetailService {
private final RestCallHolder restCallHolder;
public AccountDetailService(RestCallHolder restCallHolder) {
this.restCallHolder = restCallHolder;
}
public List<AccountDetail> update(List<AccountDetail> accountDetails) {
Map<String, Map<String, Object>> result = new HashMap<>();
List<AccountDetail> finalAccountDetails = new ArrayList<>();
accountDetails.forEach(accountDetail -> {
List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
.stream()
.map(rest -> rest.call(accountDetail.getBankAccountId()))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenAccept(aVoid -> {
Map<String, Object> res = restCallHolder.getAsyncRestCalls()
.stream()
.map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
rest.call(accountDetail.getBankAccountId()).join()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
result.put(accountDetail.getBankAccountId(), res);
}
).handle((aVoid, throwable) -> {
return null; // handle the exception here
}).join();
}
);
accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
.bankAccountId(accountDetail.getBankAccountId())
.mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
.noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
.build()));
return finalAccountDetails;
}
}
Solution 4
I would give the responsibility to fetch the field values to the model object itself.
Here are three alternative solutions, using parallel streams, streams and an executor, and a for loop and an executor.
Solution 1:
accounts.parallelStream()
.<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
account::updateAddressLine, account::updateExternalLink))
.map(RestRequest::new)
.forEach(RestRequest::run);
Solution 2:
Executor executor = Executors.newFixedThreadPool(PARALLELISM);
accounts.stream()
.<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
account::updateAddressLine, account::updateExternalLink))
.map(RestRequest::new)
.forEach(executor::execute);
Solution 3:
Executor executor = Executors.newFixedThreadPool(PARALLELISM);
for (AccountDetails account : accounts) {
execute(executor, account::updateMortgage);
execute(executor, account::updateNoOfTrans);
execute(executor, account::updateAddressLine);
execute(executor, account::updateExternalLink);
}
private static void execute(Executor executor, Runnable task) {
executor.execute(new RestRequest(task));
}
Common code:
class RestRequest implements Runnable {
private final Runnable task;
RestRequest(Runnable task) {
this.task = task;
}
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
// A request failed. Others will not be canceled.
}
}
}
class AccountDetails {
String bankAccountId;
String mortgageAccountId;
Integer noOfTrans;
String addressLine;
String externalLink;
void fetchMortgage() {
mortgageAccountId = MortgageService.getMortgage(bankAccountId).getAccountId();
}
void fetchNoOfTrans() {
noOfTrans = TransactionService.getTransactions(bankAccountId).getAmount();
}
void fetchAddressLine() {
addressLine = AddressService.getAddress(bankAccountId).getLine();
}
void fetchExternalLink() {
externalLink = LinkService.getLinks(bankAccountId).getExternal();
}
}
Kevin Rave
Updated on July 29, 2022Comments
-
Kevin Rave almost 2 years
I have a rather common or unique requirement. For example, I have the following
AccountDetails
list:List<AccountDetails>
class AccountDetails { String bankAccountId; String mortgageAccountId; Integer noOfTrans; String addressLine; String externalLink; }
All the above fields, except
bankAccountId
are pulled from external REST service call. I want to call all the REST services in parallel and update each object in the list:So, it looks like below:
For each
accountDetails
- Call mortgage REST service and update
martgageAccountId
field (REST returns MortgageInfo object) - Call transaction REST service and update
noOfTrans
field (REST returnsTransactions
object) - Call address REST service and update
addressLine
field (REST returnsAddress
object) - Call link REST service and update
externalLink
field. (REST returnsLinks
object)
I want all the above calls in parallel, and for each
AcccountDetails
object in the list. If there is an exception, I want do gracefully handle it. Note that each of the above REST service returns different custom objectI am confused about how to achieve this with
CompletableFuture
chaining. Not sureallOf
orthenCombine
(which only takes two), orthenCompose
should use and how to put all of these together.Any examples/ideas?
- Call mortgage REST service and update