Apache Camel: multicast with aggregation - AggregationStrategy called too often
I have tried to reproduce the problem, but without success. This is what I did:
The route:
public class MulticastRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
AggregationStrategy myAggregationStrategy = new MyAggregationStrategy();
List<String> listA = Lists.newArrayList("A");
List<String> listB = Lists.newArrayList("B");
from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end();
from("direct:A").setBody(constant(listA));
from("direct:B").setBody(constant(listB));
}
class MyAggregationStrategy implements AggregationStrategy {
@Override
public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" :
oldExchange.getIn().getBody().toString()) + ", newExchange = " +
newExchange.getIn().getBody().toString());
return newExchange;
}
}
}
Created a simple test just to run the route.
The test:
public class MulticastRouteTest extends CamelTestSupport {
@Test
public void testMulticastRoute() throws Exception {
context.addRoutes(new MulticastRoute());
template.sendBody("direct:multicast", null);
}
}
This prints:
Aggregate called with oldExchange = null, newExchange = [A]
Aggregate called with oldExchange = [A], newExchange = [B]
This is what we would expect. Hope this will help you. I can not see any difference in the way I do things, but hopefully you will spot it.
Related videos on Youtube
fricke
Trained Mathematician (Computational Logic), currently working to support medical modelling for diagnostics at Ada Health. Topics of Interest: Modelling, Bayesian Inference, Java, Python, Algorithms, Logics
Updated on September 15, 2022Comments
-
fricke over 1 year
I have the following strange (or at least unclear to me) behaviour for a multi-cast + aggregation. Consider the following route:
from("direct:multicaster") .multicast() .to("direct:A", "direct:B") .aggregationStrategy(new AggregationStrategy() { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { List firstResult = newExchange.getIn().getBody(List.class); newExchange.getIn().setBody(ImmutableList.copyOf(firstResult)); return newExchange; } else { List oldResults = oldExchange.getIn().getBody(List.class); List newResults = newExchange.getIn().getBody(List.class); ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults)); oldExchange.getIn().setBody(aggResult); return oldExchange; } } }) .end() // .to("log:bla")
Essentially, this route takes an input, sends it to
direct:A
anddirect:B
, expects lists from these two endpoints and concatenates them (the comment in the last line is there for a reason I will explain later).Now assume that these two endpoints "return" the lists [A] and [B], respectively. If I send the message
M
todirect:multicaster
, then the aggregator is called once witholdExchange = null
andnewExchange.in.body=[A]
, then witholdExchange.in.body=[A]
andnewExchange.out.body=[B]
(as it is supposed to do).All good up to this point. But the aggregator is called once more with
oldExchange.in.body=[A,B]
andnewExchange.in=M
(M
is the initial message). This looks similar to an included enrichment pattern.You can get the expected behaviour by removing the comment in the last line, i.e. simply adding a dummy
to("log:bla")
. With this everthing behaves as expected.Update: Trying (cf. the hint provided by Claus)
.multicast() .aggregationStrategy(aggStrategy) .to("direct:A", "direct:B") .end()
and
.multicast(aggStrategy) .to("direct:A", "direct:B") .end()
both result in the same behaviour.
What is happening here - what did I get wrong?
thanks in advance markus