How to make sure that message from JMS queue is delivered to external WebService (CXF)?

10,767

Solution 1

Consuming JMS messages in a transaction is a must for the solution to work as expected: if an exception occurs in the CXF outbound phase, the JMS message will end-up rolled back, then redelivered, triggering a new CXF call.

You must carefully configure the redelivery policy for your ActiveMQ client in order to retry enough times and maybe not too fast (exponential back-off for example). You also want to handle the DLQ appropriately. ActiveMQ's client configuration with Spring Beans in Mule is shown: http://www.mulesoft.org/mule-activemq-integration-examples

Also be sure to refer to the right broker URL in your configuration factory. With your broker name of esb-amq-broker, your configuration factory should be:

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://esb-amq-broker"/>
    ...

Solution 2

I don't know if I will help you much but this is a couple of suggestions regarding your problem:

  • have you tried to use another transaction manager than the one provided with Jboss, I would suggest to use Atomikos for such tests
  • like David suggested Transactions seem to be the best approach , but another approach would be to use explicit acknowledgment policy .... It may be tricky to set up but an interceptor like approach could watch for connections to some specific endpoints and send the ack back to your JMS server, difficult may be but it would definitely ensure that the message has been correctly delivered ....

Good luck HTH jerome

Solution 3

Not sure if this consideration helps, but what about acknowledgement modes? Could it be possible that the message has been delivered already (in auto acknowledge mode) but was not yet properly processed by the consuming service endpoint?

No idea how to configure explicit acknowledgement in this scenario, but maybe worth to investigate further.

Share:
10,767
ŁukaszBachman
Author by

ŁukaszBachman

Humble Java developer.

Updated on June 16, 2022

Comments

  • ŁukaszBachman
    ŁukaszBachman almost 2 years

    The question

    How one should configure ActiveMQ and <flow> in Mule ESB 3.2, in order to make sure that message pulled from queue ends up properly handled by external CXF service?

    Scenario

    I have an CXF endpoint, which should take incoming message and transfer it to three external services as soon as possible. Let's call them EX1, EX2, EX3. This is fairly easy, thanks to the <all> component introduced in Mule 3.x.

    The most important requirement of the whole solution, is to make sure that each received message ends up being delivered to all three CXF services. So we ended up with the idea, to put each incoming message into Persistent JMS queues (Q1, Q2, Q3). After message is being read from queue Qn, it's transfered directly to corresponding EXn endpoint, and thus - external service.

    Config

    (I can provide full config upon request)

    We have configured ActiveMQ broker as described here and wired it up with our <flow> config. Everything seems to work as expected, I have JConsole connected to my application so I can see that messages are of type PERSISTENT and they end up in proper queues. If everything goes smoothly - messages are received by all three services EXn.

    Tests

    The problem arrises when we turn off one of the services, let's say EX2, and restart the whole server simulating failure. The message ends up being lost (I guess it's not that persistent, huh?). The most curious thing is - If we sent 10 messages when the EX2 is down, after server restart 9 of them are being properly redelivered! So I'm thinking that maybe, just maybe, 9 of those 10 messages were properly enqueued, while the one was being constantly redelivered when the server failed down.

    This causes me to think, that CXF endpoint is not being handled with transaction support, which I cannot understand, to be honest. After all I can see the message being in the queue when it's trying to be redelivered, so it should be persisted. It's clearly not, but why?

    My own attempts I have tried a number of things, none of which have worked. Always one message gets lost.

    1. Not to use any <jms:transaction /> tags within the flows - didn't work
    2. Starting jms transaction upon message receive, joining while sending to <cxf:jaxws-client />
    3. Using XA with JBoss and <xa-transaction /> - didn't work
    4. Providing <default-exception-strategy> config - If I recall it made things worst

    Any help is appreciated, thanks.

    CONFIG

    ACTIVE MQ CONFIGURATION

    <spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
        <spring:property name="queue" value="queue.*"/>
        <spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
    </spring:bean>
    
    <spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
        <spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
    </spring:bean>
    
    <spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
        <spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&amp;broker.persistent=true&amp;broker.useJmx=true"/>
        <spring:property name="redeliveryPolicy">
            <spring:bean class="org.apache.activemq.RedeliveryPolicy">
                <spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
                <spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
                <spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
                <spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
            </spring:bean>
        </spring:property>
    </spring:bean>
    
    <spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
        <spring:property name="directory" value="/home/bachman/activemq"/>
    </spring:bean>
    
    <spring:bean name="AmqBroker"
                 class="org.apache.activemq.broker.BrokerService"
                 init-method="start"
                 destroy-method="stop">
        <spring:property name="brokerName" value="esb-amq-broker"/>
        <spring:property name="persistent" value="true"/>
        <spring:property name="dataDirectory" value="/home/bachman/activemq"/>
        <spring:property name="useJmx" value="true"/>
        <spring:property name="useShutdownHook" value="false"/>
        <spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
        <spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
    </spring:bean>
    
    <jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
                            numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
                            connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
                            disableTemporaryReplyToDestinations="true"/>
    

    FLOW - dispatch incoming message to 3 queues Qn

    <flow name="dispatch-to-queues">
            <inbound-endpoint ref="incoming-cxf"/>
    
            <!-- Each received message ends up to be sent to all destinations -->
            <all>
                <jms:outbound-endpoint name="queue.q1"
                    queue="queue.q1" disableTransportTransformer="false"
                    disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                    doc:name="JMS" doc:description="Receive messages on Q1"
                        connector-ref="PersistentJMSConnector"/>
    
                <jms:outbound-endpoint name="queue.q2"
                    queue="queue.q2" disableTransportTransformer="false"
                    disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                    doc:name="JMS" doc:description="Receive messages on q2"
                    connector-ref="PersistentJMSConnector" />
    
                <jms:outbound-endpoint name="queue.q3"
                    queue="queue.q3" disableTransportTransformer="false"
                    disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                    doc:name="JMS" doc:description="Receive messages on Q3"
                    connector-ref="PersistentJMSConnector" />
    
            </all>
            <custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
        </flow>
    

    FLOW - handle delivery from Qn to EXn

    <flow name="from-q1-to-ex1">
            <jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Pull from q1."
                connector-ref="PersistentJMSConnector">
                    <jms:transaction action="ALWAYS_BEGIN" />
            </jms:inbound-endpoint>
            <logger message="Sending message to EX-1" level="INFO" />
    
            <!-- Handle errors at this point in flow
            <custom-processor class="pl.exception.lookup.Component">
                <spring:property name="targetModuleName" value="Not-important"/>
            </custom-processor>
            -->
    
    
            <outbound-endpoint ref="ex1-cxf-endpoint">
                <jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
            </outbound-endpoint>
        </flow>
    

    ENDPOINTS - declaration of referred endpoints

    <endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
            <cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
        </endpoint> 
    
    <endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
            <cxf:jaxws-client
                    clientClass="com.mycompany.services.Ex1"
                    wsdlLocation="classpath:wsdl/ex1.wsdl"
                    operation="someOperation"
                    port="SomePort"/>
        </endpoint>
    
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Dear @David, I have seen your previous answers and I was hoping that You will reply :) I will post all necessary config first thing tomorrow when I arrive to work. Thanks for support, this is very important issue for me, since my most valuable customer is awaiting working solution.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Ok, I have added config. If you would be curious about pl.exception.lookup.Component, it's just a component that tries to foresee if exception will be thrown in next processor in flow. I have also tried to test solution without it, since I thought it might shadow underlying exceptions.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Ok, I will adjust my config and post the results here. One question though - is it ok that my config lacks of <default-exception-strategy> and explicit rollbacks? I assumed that transaction will be rolled back automatically after CXF endpoint indicates an exception, or am I wrong?
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Ok, so I have changed all JMS endpoints to one-way exchange pattern. That didn't help. So I have added jms.prefetchPolicy.all=1 to my connection broker URL. Didn't help as well. I have commented out the whole exception lookup component so we can throw it out of our heads - that didn't help as well. Note updated config for reference. I have then tried to provide <default-exception-strategy>, forced from-q1-to-ex1 flow to be synchronous and even declared cxf:jaxws-client within the flow (to be sure that ref= doesn't mess up anything. None of those have worked, so I'm still in a trouble.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Please let me also know: CXF should be configured as one-way or request-response? I think r-r, since it needs to notify flow about the exception, right?
  • David Dossot
    David Dossot over 12 years
    Yes, the outbound HTTP endpoint should be request-response. When you say it still doesn't work, you're saying that redelivery happens for most messages but some still get lost? If you test "from-q1-to-ex1" independently, does the same occur?
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    My testcase looks like following: 1. I launch clean mule EE and upload my app via web-console, 2. I turn off one inbound mocking EX1 via web-console, 3. I send two SOAP requests to my dispatch-to-queues endpoint and monitor logs - EX2 and EX3 logs delivered messages, EX1 is not (as expected), 4. I run ps -elf | grep mule and kill all mule processes, simulating cluster failure, 5. I run Mule EE again and monitor logs. When the application starts I can see JUST ONE LOG OUTPUT FROM EX1 (meaning that only one message was delivered instead of two).
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    I know this manual test is not the best and should be automatic, but to be honest I'm in rush and don't have that much of an experience with Unit or Integration testing of Mule. Next important question might be - is <jms:transaction/> allowed in cxf endpoint? I referred to Mule in Action chapter about Transactions, and you are showing an example of JMS-to-JDBC ran on XA. Maybe I should stick to XA then?
  • David Dossot
    David Dossot over 12 years
    If you check the JMS queue after killing Mule, are there 2 pending messages waiting for EX1 to consume them? XA won't fly: HTTP can't enroll in it, you're doing the right thing. Also you say you're using EE: you'd rather open a support ticket with Mule since you have their enterprise-level support.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    I happen to be testing management console, so I have EE trial for about two more weeks. I think my company will buy licence in a couple of months, but we are developing core concepts using CE right now. Thanks for clearing out XA question, that leaves me one less thing to worry about ;)
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    You have also mentioned checking out ActiveMQ contents when Mule is down. How can I do that? I'm using JConsole to monitor queues, which of course goes down along with Mule process. So how could I have a look inside queues when the server is down? I was trying to monitor persistent store contents, but since it's binary content, I gave up right away.
  • David Dossot
    David Dossot over 12 years
    Duh, sorry man, I just realized you run AMQ embedded in Mule. See my updated response.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Wooow, that is a surprise :) I thought that persistent queue means that the message is persisted right after it has been received by queue and it stays that way even though the broker goes down. If I understood you correctly, the message currently being processed by queue might not be persistent at that time. Well, I'll run some tests and see what I came up with. I need to have a solid solution for my use case, so any other ideas are appreciated.
  • David Dossot
    David Dossot over 12 years
    Yes you are right, theoretically this is how things should work. Now in real life... Can you try to temporarily use an external AMQ broker? This will allow to test the behavior of Mule in independence of the JMS provider. This will allow checking the queue content after killing Mule too.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Yes, I will try to do that. Also I have learned that MMC gives you an insight on AMQ queues as well, so I will compare it to the output of JConsole. I assume that results will be the same, since both are using JMX, but maybe I'll come up with something. Anyways, I will definitelly come back here with the result of using external JMS.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Ok, I have configured externall ActiveMq instance and performed two tests. 1 - kill mule, kill active-mq, restart both; 2 - kill active-mq, try send message, start active-mq. In first case all messages are properly redelivered. In second test the mule connector became unresponsive. I had to restart mule, then the messages have been redelivered. This proves me that the whole config might be OK, but solution working altogether is not :( Unfortunately, adding another dependency to my setup (external JMS, which has to be monitored onside) is not great for me, especially that depending on test case
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    ... solution might require hard reset of Mule instance. Managing two those entities separately introduce more complexity, which I would like to avoid. Also - I forget to mention, that I couldn't make it work on VM, I had to use TCP. I'm not an expert to judge if that has any other impact on the whole solution. This is getting more and more complicated.
  • David Dossot
    David Dossot over 12 years
    Indeed, the VM connector only works for in memory AMQ brokers. I'm not suggesting using a remote AMQ broker as your deployment target (it brings a whole new class of issues to deal with), really just to see if killing Mule (not AMQ) impacts message redelivery. Do not kill AMQ otherwise you'll have to deal with Mule retry policies for reconnection. Really just kill Mule and see if the 2 messages for EX1 are there and ensure they are picked up after Mule restart.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    Yes, I can confirm that in case of killing just a Mule instance and bringing it back to life - the messages were redelivered, I could see them all in external ActiveMq. But still, I don't know where the problem with embedded JMS lies. Also, I have noticed that in my config I was using brokerName which has not been the same as brokerUrl in connectionFactory. I have changed that appropriately, but it didn't help. Maybe there is still something that is causing odd behaviour. @David, did you managed to make similar scenario ever work? Maybe following some exisitng examples would help me.
  • David Dossot
    David Dossot over 12 years
    OK, so I think Mule is doing the right thing. When I want strong guarantee of no JMS message loss, I run my JMS provider not embedded (ie standalone) and in HA mode (clustered). Now for your particular case the broker name mismatch is maybe the issue: you're potentially starting a second in memory broker from the connection factory.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
    ok, HA is good option for me as well, since we will be targeting it anyhow with our EE licence. So, is there any reference where I could wire up my standalone provider to be ready for HA? I know that Mule's support will help us to set up and deploy our app properly, but I could do necessary arrangements right now. Unfortunately, Mule doesn't seem to provide much details about HA in documentation.
  • David Dossot
    David Dossot over 12 years
    I was referring to ActiveMQ's HA options, not Mule's. If the JMS broker is HA, it's fine if Mule is not.
  • ŁukaszBachman
    ŁukaszBachman over 12 years
  • ŁukaszBachman
    ŁukaszBachman about 12 years
    I'm not using Jboss, but Mule ESB. It comes with a built in server and default transaction manager. I have also tried to use JBoss transaction manager, but withour any luck (it worked exactly the same). I will think about the second idea.
  • ŁukaszBachman
    ŁukaszBachman about 12 years
    I have tried ack in two scenarios: AUTO and CLIENT. Didn't help. Also, I want to point out that the message is still in the queue and redelivery is being attempted (I can see that Mule is constantly trying to send the message to failure endpoint), but just gets lost on server hard reset.