How to listen to a message queue from a web application? (Tomcat, ActiveMQ)

18,390

Solution 1

If you have code that already can consume messages from the queue ( which it appears you do ), then I think your problem comes down to how do you get that piece of code to run.

It appears you aren't using any frameworks so I think the approach that I would take would be to take that code that can retrieve messages from the queue and run it in a separate thread in the application server. Get that thread to start up at application server start time, and tidy up itself as the application server closes down.

Easiest way to start the thread on app server startup is to introduce a ServletContextListener ( an example here.) In the Context Listener start your queue listening code in a separate thread.

EDIT: I used this proposed solution and added the code above to the question.

Solution 2

  1. Configure JMS queue in Tomcat: https://martinsdeveloperworld.wordpress.com/2013/03/03/apache-activemq-and-tomcat/

  2. Put activemq-all-5.xx.jar to $TOMCAT_HOME/lib

  3. Listen in cycle in Thread, started in @WebListener:

  4. Stop when contextDestroyed

    @WebListener
    @Slf4j
    public class JmsMailListener implements ServletContextListener {
        private Thread listenerThread = null;
        private QueueConnection connection;
    
        @Override
        public void contextInitialized(ServletContextEvent sce) {
            try {
                InitialContext initCtx = new InitialContext();
                ActiveMQConnectionFactory connectionFactory =
                        (ActiveMQConnectionFactory) initCtx.lookup("java:comp/env/jms/ConnectionFactory");
                connectionFactory.setTrustAllPackages(true);
                connection = connectionFactory.createQueueConnection();
                QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue queue = (Queue) initCtx.lookup("java:comp/env/jms/queue/MailQueue");
                QueueReceiver receiver = queueSession.createReceiver(queue);
                connection.start();
                log.info("Listen JMS messages ...");
                listenerThread = new Thread(() -> {
                    try {
                        while (!Thread.interrupted()) {
                            Message m = receiver.receive();
                            if (m instanceof ObjectMessage) {
                                ObjectMessage om = (ObjectMessage) m;
                                MyObject myObject = (MyObject) om.getObject();
                                log.info("Received MyObject {}", myObject);
                                ...
                            }
                        }
                    } catch (Exception e) {
                        log.error("Receiving messages failed: " + e.getMessage(), e);
                    }
                });
                listenerThread.start();
            } catch (Exception e) {
                log.error("JMS failed: " + e.getMessage(), e);
            }
        }
    
        @Override
        public void contextDestroyed(ServletContextEvent sce) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                    log.warn("Couldn't close JMSConnection: ", ex);
                }
            }
            if (listenerThread != null) {
                listenerThread.interrupt();
            }
        }
    }
    

Solution 3

I am using activeMQ in my webApplication spring-mvc and JMS Template with the following approach.

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
        <value>tcp://localhost:61616</value>
    </property>
</bean>

<bean id="messageSender" class="xyz.MessageSender"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="destination" />
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="REQUEST_QUEUE" />
</bean> 
<bean id="messageListener" class="xtz.MessageListener">
    <property name="listenerId" value="1" />
</bean> 

<jms:listener-container connection-factory="connectionFactory">     
    <jms:listener destination="RESPONSE_QUEUE" ref="messageListener" method="messageReceived" />
</jms:listener-container>

Sender and Listener class implementation given below.

public class MessageSender 
{

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage()
    {

        jmsTemplate.send(new MessageCreator()
        {
            public Message createMessage(Session session) throws JMSException
            {
                MapMessage message = session.createMapMessage();
                message.setString("messageType", XXX);
                message.setString("jsonMessage", XXXX);
                return message;
            }
        });
    }
}


public class MessageListener
{
    private int listenerId;

    @Override
    public void messageReceived(Map<String, Object> message) throws Exception
    {        
        //put your logic here 
    }

    public int getListenerId()
    {
        return listenerId;
    }

    public void setListenerId(int listenerId)
    {
        this.listenerId = listenerId;
    }
}

Solution 4

I am using spring to listen to a queue. defining the listener is like this :

<jms:listener-container connection-factory="jmsConnectionFactoryLocal">
            <jms:listener destination="QUEUE_NAME" ref="channelManagerSimulatorDefault"/>
</jms:listener-container>

the jmsConnectionFactoryLocal must be created according to your MQ. in my case it was IBM WebsphereMQ, so the jmsConnectionFactoryLocal defining is like this :

<bean id="mqConnectionFactoryLocal" class="com.ibm.mq.jms.MQQueueConnectionFactory">
       <property name="hostName">
         <value>THE_MQ_SERVER_IP</value>
       </property>
       <property name="port">
          <value>MQ_PORT</value>
       </property>
       <property name="queueManager">
          <value>QUEUE_MANAGER_NAME</value>
       </property>
       <property name="transportType">
          <value>1</value>
       </property>
    </bean>
    <bean id="jmsConnectionFactoryLocal" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
       <property name="targetConnectionFactory" ref="mqConnectionFactoryLocal"/>
       <property name="username" value="USER_NAME"/>
       <property name="password" value="PASSWORD"/>
    </bean>

you have to find the right ConnectionFactory implementation for ActiveMQ and use it. the listener and jmsConnectionFactory are the same and independent to the MQ provider.

Share:
18,390
Dimitri Dewaele
Author by

Dimitri Dewaele

Senior software engineer in a wide variety of business applications. Particularly interested in Java development, cloud computing, server performance and stability. Also passionate about android technology as official android publisher. Always interested in new technologies, as well as close interaction with the Open Source community.

Updated on July 20, 2022

Comments

  • Dimitri Dewaele
    Dimitri Dewaele almost 2 years

    I'm happily improving my web application that runs on a Apache Tomcat. An ActiveMQ JMS server is added to send and receive messages.

    I already can send and receive messages, but need help on the receiver side.

    How should my web app continuously listen to one queue to receive messages?

    New messages arrive and the server should act on them. Ex: add data to the DB or or send an message back.

    I can already send messages. This is code.

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection connection = factory.createConnection();
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue("clientQueue");
    MessageProducer publisher = session.createProducer(queue);
    connection.start();
    
    Message message = null;
    message = session.createTextMessage("Text Message");
    publisher.send(message);
    

    I can already receive a message after a request (a click ;-))

    connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    connection = connectionFactory.createConnection();
    connection.start();
    session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    destination = session.createQueue("serverQueue");
    consumer = session.createConsumer(destination);
    
    while (true) {
        Message message = consumer.receive(300000);
        //Do message stuff
    }
    

    How should I let the web application listen to the queue continuously? What is the advised way?

    All help is warmly appreciated. Thanks.

    EDIT - SOLUTION

    Current working solution with the proposals from DaveH

    I have added a ServletContextListener to listen to my message continuously.

    web.xml

    <listener>
        <listener-class>com.test.JMSContextListener</listener-class>
    </listener>
    

    The Listeren:

    public class JMSContextListener implements ServletContextListener {
        @Override
        public void contextInitialized(ServletContextEvent arg0) {
            Thread thread = new Thread(new JMSConnector());
            thread.start();
        }
    
        @Override
        public void contextDestroyed(ServletContextEvent arg0) {
            //Nothing
        }
    }
    

    The Connection:

    public class JMSConnector implements Runnable {
        public void run() {
            try {
                Context context = new InitialContext();
                QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup("java:comp/env/jms/ConnectionFactory");            
                Connection connection = factory.createConnection();
                Queue queue = (javax.jms.Queue) context.lookup("java:comp/env/jms/serverQueue");
                Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    
                MessageConsumer consumer = session.createConsumer(queue);
    
                //This MessageListener will do stuff with the message
                MessageListenerImpl messageListener = new MessageListenerImpl();
                consumer.setMessageListener(messageListener);
                connection.start();
    
                // Start connection or nothing will happen!!!
                connection.start();
            } catch (JMSException ex) {
                //TODO
            } catch (NamingException ex) {
                //TODO
            }
        }
    }
    

    Is this an advised way or should this be improved?

    All help is warmly appreciated. Thanks.

    • Dimitri Dewaele
      Dimitri Dewaele almost 9 years
      FYI: I'm not using Spring...
    • Shogg
      Shogg over 7 years
      For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml. (Mkyong)
  • Dimitri Dewaele
    Dimitri Dewaele almost 9 years
    This looks promising. I can receive messages when I 'click' a button. Let me try this approach and come back to the forum with a solution.
  • DaveH
    DaveH almost 9 years
    Your code isn't quite what I meant. You need to start your listener in a separate thread - I think what you have there will cause the app server to hang. I'd suggest making your JMSConnector implement the Runnable interface and starting that in a separate thread from your ServletContextListener
  • Dimitri Dewaele
    Dimitri Dewaele almost 9 years
    a thread is added and this seems to work. If you have more advise, then this is more than welcome!
  • DaveH
    DaveH almost 9 years
    That looks better. You'd probably want to implement the Context destroyed method in the listener as well so you can close the queue tidyly.
  • Jing Ma
    Jing Ma over 4 years
    How come the app server hangs if you don't start the listener in a separate thread?