Apache Camel - Split and aggregate - Old Exchange is always null

10,338

I think I figured it out how to aggregate the messages using Aggregator. I added a headerName called id and use it as my correlation id.

<camel:route>
  <camel:from uri="file:src/data/catask/test?noop=true"/>
  <camel:log message="${body}"></camel:log>
  <camel:split>
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:addHeaders"/>
    <camel:to uri="direct:aggegateQueries"/>
  </camel:split>
</camel:route>

<camel:route>
  <camel:from uri="direct:addHeaders"/>
  <camel:setHeader headerName="id">
    <camel:constant>order</camel:constant>
  </camel:setHeader>
</camel:route>

<camel:route>
  <camel:from uri="direct:aggegateQueries"/>
  <camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
    <camel:correlationExpression>
      <simple>header.id</simple>
    </camel:correlationExpression>
    <camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
    <camel:log message="MERGED:: /n ${body}"></camel:log>
  </camel:aggregate>
</camel:route>  

This aggregates my messages. However I am still not sure that despite using correct XPATH why does Camel thinks that it is different type of message?

COPYING CLAUS's explanation from camel forums: "Looks like its your correlation expression that is a new group for each message, eg each xpath result is different. If you want to split and join the same messages then see this eip http://camel.apache.org/composed-message-processor.html And see the example using only splitter. That is much easier to do. "

I tested the Xpath expression using a Xpath Evaluator tool and also printed out the coorelation expression results and all my messages with //Order are same. Ex-

Group 1: 
<Order>  
  <orderNum>1</orderNum>  
</Order>  

Group 2: 
<Order>  
  <orderNum>2</orderNum>  
</Order> 
Share:
10,338
Dinesh Arora
Author by

Dinesh Arora

Updated on June 04, 2022

Comments

  • Dinesh Arora
    Dinesh Arora almost 2 years

    I see that this question has been asked a number of times but none of post helped or had a conclusive solution. I am splitting a message and then aggregating it using Aggregator2. The code was throwing exception because oldExchange was always null. So to test I designed a small code.

    I read an orders,xml file which looks like this

    <Orders xmlns="http://some/schema/Order">
        <Order>
                <orderNum>1</orderNum>
        </Order>
        <Order>
                <orderNum>2</orderNum>
        </Order>
        <Order>
                <orderNum>3</orderNum>
        </Order>
        <Order>
                <orderNum>5</orderNum>
        </Order>
        <Order>
                <orderNum>6</orderNum>
        </Order>
    

    My camel Context Looks like this

    <camel:route>
    <camel:from uri="file:src/data/catask/test?noop=true"/>
    <camel:log message="${body}"></camel:log>
    <camel:split>
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:logQueries"/>
    <camel:to uri="direct:aggegateQueries"/>  
    </camel:split>
    
    </camel:route>
    
    <camel:route>
    <camel:from uri="direct:logQueries"/>
    <camel:log message="After the call : \n ${body}"></camel:log>  
    </camel:route>
    
     <camel:route>
    <camel:from uri="direct:aggegateQueries"/>
    <camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
    <camel:correlationExpression>
    <camel:xpath>//te:Order</camel:xpath>
    </camel:correlationExpression>
    <camel:to uri="file:src/data/catask/output?fileName=output.xml"/>  
    
    </camel:aggregate>
    </camel:route>  
    

    My Aggregation Strategy class looks like this

       public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
                if (oldExchange == null) { 
                System.out.println("Returning new exchange"); 
                    return newExchange; 
                } 
    
                String oldBody = oldExchange.getIn().getBody(String.class); 
                String newBody = newExchange.getIn().getBody(String.class); 
                oldExchange.getIn().setBody(oldBody + "+" + newBody); 
                return oldExchange; 
            } 
    

    The problem is that when the aggregated result is saved in output.xml file it contains only the last record it read from Orders.xml.

    i.e.

    <Order xmlns="http://some/schema/Order">
                <orderNum>6</orderNum>
        </Order>
    

    I looked into it further and found that this was happening because after the first call oldExchange should have some value but it turns out it is always null. I think that because it is reading everything from a single file and splitting it, there is only exchange.

    > Any suggestions??

    UPDATE 1: Per Claus I can use Splitter only to solve this issue. I did that and was able to successfully join all the messages. However I am still looking for a way to use Aggregator2. Here how I did it using Splitter only.

    camel:route>
    <camel:from uri="file:src/data/catask/test?noop=true"/>
    <camel:log message="${body}"></camel:log>
    <camel:split strategyRef="aggrTask"> 
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:logQueries"/>
     < 
    </camel:split>
    
    </camel:route>
    
    <camel:route>
    <camel:from uri="direct:logQueries"/>
    <camel:log message="After the call : \n ${body}"></camel:log>  
    </camel:route>