How is concurrency in Spring AMQP Listener Container implemented?

13,990

Yes, to use concurrency, your listener has to be thread-safe. There is one listener instance per container. However, the <rabbit:listener-container/> namespace element is actually just a convenience for adding "shared" attributes, each listener element gets its own container.

It's generally best to use stateless objects (no fields that are written to), but that's not always possible.

If your listener is not thread-safe, you can use...

<rabbit:listener-container
    connection-factory="myConnectionFactory"
    acknowledge="none"
    requeue-rejected="false">
    <rabbit:listener ref="myListener" queues="myQueue"/>
    <rabbit:listener ref="myListener" queues="myQueue"/>
    <rabbit:listener ref="myListener" queues="myQueue"/>
    <rabbit:listener ref="myListener" queues="myQueue"/>
    ...
</rabbit:listener-container>

...and add @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE). You will then get a container for each listener and a different instance of the listener will be injected into each.

You will also need prototype scope for any non-thread-safe dependencies injected into the listener.

Share:
13,990
Parobay
Author by

Parobay

Updated on June 11, 2022

Comments

  • Parobay
    Parobay almost 2 years

    My container XML config:

    <rabbit:listener-container
            connection-factory="myConnectionFactory"
            acknowledge="none"
            concurrency="10"
            requeue-rejected="false">
        <rabbit:listener ref="myListener" queues="myQueue"/>
    </rabbit:listener-container>
    

    and myListener is just a class

    @Component("myListener")
    public class MyListener implements MessageListener {
        @Autowired
        SomeDependency dependency;
        ....
    }
    

    I've specified concurrency="10" in my XML. What does this mean exactly?


    I've found some docs. They are not that helpful stating:

    Specify the number of concurrent consumers to create. Default is 1.


    What I am interested in is whether MyListener has to be thread safe i.e.

    • are there many instances created or single instance used by many threads?
    • can I access instance fields w/o synchronization?
    • is SomeDependency dependency instantiated once or for each thread/instance?
    • does dependency need to be thread safe?
  • Parobay
    Parobay about 10 years
    I want to be sure I understand correctly: in my example there are 10 listener-containers, each holding the one and only MyListener instance, correct? In your example there are 10 containers, each with their own listener? Is it somehow possible to keep my approach (using concurrency) and make Dependency separate for each listener-container ?
  • Gary Russell
    Gary Russell about 10 years
    No; in your example there is 1 listener container with 10 consumer threads; in mine there are 10 containers each with one thread. Using prototype scope means each container gets its own instance; so your listener doesn't need to be thread-safe. Without prototype scope, each container would get a reference to the same instance - effectively no different (functionally) to your original case. Remember to make all downstream dependencies prototype scope too. Like I said, it's generally best to try to make your listener, and its dependencies, stateless to avoid these issues.
  • Parobay
    Parobay about 10 years
    Thank you, it's clear now. Stateless classes are really pleasure to work with, but I am afraid I'll have a pretty statefull one here :-( Maybe a ThreadLocal will be the easiest way out
  • Fab
    Fab over 8 years
    How would you this configuration using annotations in spring-boot?
  • Gary Russell
    Gary Russell over 8 years
    It's better to ask a new question and reference this one rather than using comments. I answered it here.