Apache Camel: multicast with aggregation - AggregationStrategy called too often

12,026

I have tried to reproduce the problem, but without success. This is what I did:

The route:

public class MulticastRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        AggregationStrategy myAggregationStrategy = new MyAggregationStrategy();
        List<String> listA = Lists.newArrayList("A");
        List<String> listB = Lists.newArrayList("B");
        from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end();

        from("direct:A").setBody(constant(listA));
        from("direct:B").setBody(constant(listB));
    }

    class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" :
                    oldExchange.getIn().getBody().toString()) + ", newExchange = " +
                    newExchange.getIn().getBody().toString());
            return newExchange;
        }
    }
}

Created a simple test just to run the route.

The test:

public class MulticastRouteTest extends CamelTestSupport {
  @Test
    public void testMulticastRoute() throws Exception {
        context.addRoutes(new MulticastRoute());
        template.sendBody("direct:multicast", null);
    }
}

This prints:

Aggregate called with oldExchange = null, newExchange = [A]
Aggregate called with oldExchange = [A], newExchange = [B]

This is what we would expect. Hope this will help you. I can not see any difference in the way I do things, but hopefully you will spot it.

Share:
12,026

Related videos on Youtube

fricke
Author by

fricke

Trained Mathematician (Computational Logic), currently working to support medical modelling for diagnostics at Ada Health. Topics of Interest: Modelling, Bayesian Inference, Java, Python, Algorithms, Logics

Updated on September 15, 2022

Comments

  • fricke
    fricke over 1 year

    I have the following strange (or at least unclear to me) behaviour for a multi-cast + aggregation. Consider the following route:

        from("direct:multicaster")
                    .multicast()
                    .to("direct:A", "direct:B")
                    .aggregationStrategy(new AggregationStrategy() {
                        @Override
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                List firstResult = newExchange.getIn().getBody(List.class);
                                newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
                                return newExchange;
                            } else {
                                List oldResults = oldExchange.getIn().getBody(List.class);
                                List newResults = newExchange.getIn().getBody(List.class);
                                ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
                                oldExchange.getIn().setBody(aggResult);
                                return oldExchange;
                            }
                        }
                    })
                    .end()
    //                .to("log:bla")
    

    Essentially, this route takes an input, sends it to direct:A and direct:B, expects lists from these two endpoints and concatenates them (the comment in the last line is there for a reason I will explain later).

    Now assume that these two endpoints "return" the lists [A] and [B], respectively. If I send the message M to direct:multicaster, then the aggregator is called once with oldExchange = null and newExchange.in.body=[A], then with oldExchange.in.body=[A] and newExchange.out.body=[B] (as it is supposed to do).

    All good up to this point. But the aggregator is called once more with oldExchange.in.body=[A,B] and newExchange.in=M (M is the initial message). This looks similar to an included enrichment pattern.

    You can get the expected behaviour by removing the comment in the last line, i.e. simply adding a dummy to("log:bla"). With this everthing behaves as expected.

    Update: Trying (cf. the hint provided by Claus)

                .multicast()
                .aggregationStrategy(aggStrategy)
                .to("direct:A", "direct:B")
                .end()
    

    and

                .multicast(aggStrategy)
                .to("direct:A", "direct:B")
                .end()
    

    both result in the same behaviour.

    What is happening here - what did I get wrong?

    thanks in advance markus