DefaultJmsListenerContainerFactory and Concurrent Connections not shutting down

14,028

try with setting maxMessagesPerTask > 0

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory());
    factory.setMaxMessagesPerTask(1);
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
    return factory;
}

you can refer to the doc http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setMaxMessagesPerTask-int-

jms.connections.prefetch=1000 means that if you have 1000 messages waiting on the Q you will have only 1 thread started to treat these 1000 messages.

for example jms.connections.prefetch=1 means the messages will be dispatched equally to all available threads but with this it is better to set maxMessagesPerTask < 0 because Long-lived tasks avoid frequent thread context switches. http://activemq.apache.org/what-is-the-prefetch-limit-for.html

Share:
14,028
jcb
Author by

jcb

Updated on June 04, 2022

Comments

  • jcb
    jcb almost 2 years

    I am using Spring 4.x's DefaultJmsListenerContainerFactory in order to connect to an ActiveMQ Queue, process the messages from that queue using a @JmsListener and then push the message to a topic on the same ActiveMQ broker.

    I am using a single Caching Connection Factory for both the consumer/listener and the producer, and I set the cache consumer to be false, so that I can cache the producer, but not the consumer. I also set the concurrency to be 1-3, which I expect will have a minimum of 1 consumer in the queue on application startup, and as the messages ramp up, then the number of consumers would reach 3. However, as the number of messages dwindle, I was expecting that the number of consumers would go back down to 1 as well. However, if I take a look at the threads (defaultmessagelistenercontainer-2/3), they are in a waiting state, and they do not shut down. Is it not the expected behaviour that when the load subsides, that the number of consumers would be expected to shut down as well? Please see my configurations below, and let me know if this behaviour is not out of the box, and if I need to add something to get this working as I laid out above.

    ApplicationContext.java

        @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
        factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
        return factory;
    }
    
    @Bean
    public CachingConnectionFactory connectionFactory(){
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class));
        redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class));
        redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class));
        redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class));
        redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class));
    
        ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint"));
        activeMQ.setRedeliveryPolicy(redeliveryPolicy);
        activeMQ.setPrefetchPolicy(prefetchPolicy());
    
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
        cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class));
        cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class));
        return cachingConnectionFactory;
    }
    
    @Bean
    public JmsMessagingTemplate jmsMessagingTemplate(){
        ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out"));
    
        JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
        jmsMessagingTemplate.setDefaultDestination(activeMQ);
    
        return jmsMessagingTemplate;
    }
    

    application.properties

    jms.connections.concurrent=1-3
    jms.connections.prefetch=1000
    jms.connections.transacted=true
    jms.connections.cache.consumers=false
    jms.redelivery.initial-delay=1000
    jms.redelivery.delay=1000
    jms.redelivery.maximum=5
    jms.redelivery.use-exponential-back-off=true
    jms.redelivery.back-off-multiplier=2
    jms.cache.size=3
    jms.queue.in=in.queue
    jms.queue.out=out.queue
    jms.broker.endpoint=failover:(tcp://localhost:61616)