CompletableFuture vs Spring Transactions
Solution 1
The reason of your problem is, as said above, that the transaction ends when the return of method process(..) is reached.
What you can do, is create the transaction manually, that gives you full control over when it starts and ends.
Remove @Transactional
Autowire the TransactionManager then in process(..) :
TransactionDefinition txDef = new DefaultTransactionDefinition();
TransactionStatus txStatus = transactionManager.getTransaction(txDef);
try {
//do your stuff here like
doWhateverAsync().then(transactionManager.commit(txStatus);)
} catch (Exception e) {
transactionManager.rollback(txStatus);
throw e;
}
Solution 2
In case of Spring Boot Application , you need following configurations.
The main application method should be annotated with @EnableAsync.
@Async annotation should be on the top of method having @Transactional annotation. This is necessary to indicate processing will be taking place in child thread.
Vaelyr
Updated on September 14, 2022Comments
-
Vaelyr over 1 year
Idea
I have a processing method which takes in a list of items and processes them asynchronously using external web service. The process steps also persist data while processing. At the end of whole process, I want to persist the whole process along with each processed results as well.
Problem
I convert each item in the list into
CompletableFuture
and run a processing task on them, and put them back into an array of futures. Now using its.ofAll
method (in sequence method) to complete future when all the submitted tasks are completed and return anotherCompletableFuture
which holds the result.When I want to get that result, I call
.whenComplete(..)
, and would want to set the returned result into my entity as data, and then persist to the database, however the repository save call just does nothing and continues threads just continue running, it's not going past the repository save call.@Transactional public void process(List<Item> items) { List<Item> savedItems = itemRepository.save(items); final Process process = createNewProcess(); final List<CompletableFuture<ProcessData>> futures = savedItems.stream() .map(item -> CompletableFuture.supplyAsync(() -> doProcess(item, process), executor)) .collect(Collectors.toList()); sequence(futures).whenComplete((data, throwable) -> { process.setData(data); processRepository.save(process); // <-- transaction lost? log.debug("Process DONE"); // <-- never reached }); }
Sequence method
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) ); }
What is happening? Why is the persist call not passing. Is the thread that started the transaction not able to commit the transaction or where does it get lost? All the processed data returns fine and is all good. I've tried different transaction strategies, but how is it possible to control which thread is gonna finish the transaction, if it's the case?
Any advice?
-
vishr over 7 yearsImportant to note that doWhateverAsync must operate in the same transactional context created. Care should be taken when doing operations in other threads since the same context may not be available.
-
pannu over 6 years@vishr just adding on that any work happening in CompletableFuture will it have the transaction even if we are manually closing the the transaction after the completion of the future, because as per my understanding completablefuture would be running on a new thread(or executable thread pool) not the the thread on which our request was being served. And the new thread might not have same context.
-
Aliaksandr Arashkevich over 5 yearsHow do you autowire the transaction?
-
moldovean over 4 yearsJust adding the @Async annotation failed to persist anything in my case. And I don't see any inserts in the debug sesstion
-
potame over 3 yearsIt might not be the only reason, it seems that the transaction is not propagated to threads created by the @Transactional-annotated method : github.com/spring-projects/spring-framework/issues/25439