RabbitMQ: fast producer and slow consumer

23,758

Solution 1

"Will this cause the message queue to overflow?"

Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.

"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"

You have 2 options:

  1. Add more consumers. Bear in mind that your DB will now be manipulated by multiple concurrent processes if you choose this option. Ensure that the DB can withstand the extra pressure.
  2. Increase the QOS value of the consuming channel. This will pull more messages from the queue and buffer them on the consumer. This will increase the overall processing time; if 5 messages are buffered, the 5th message will take the processing time of messages 1...5 to complete.

"Should I use multithreading in the consumer part to speed up the consumption rate?"

Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.

When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?

If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.

Solution 2

You have lot of ways to increase your performance.

  1. You can create a worker queue with more producers, in this way you create an simple load-balance system. don't use exchange---> queue but only queue. Read this post RabbitMQ Non-Round Robin Dispatching

  2. When you get a message you can create a poolthread for insert the data on your database, but in this case you have to manage the failure.

But I think the principal problem is the database and not RabbitMQ. With a good tuning, multi-threading and worker queue you can have a scalable and fast solution.

Let me know

Solution 3

While it is true adding more consumers may speed things up the real issue will be saving to the database.

There are already many answers here that talk about adding consumers (threads, and or machines) and changing the QoS so I'm not going to reiterate that. Instead you should seriously consider using the Aggregator pattern to aggregate the messages into a group of messages and then batch insert the group into your database in one shot.

Your current code for each message probably opens up a connection, inserts the data, and the closes that connection (or return to the pool). Worse it may even be using transactions.

By using the aggregator pattern your essentially buffering the data before you flush.

Now writing a good aggregator is tricky. You will need to decide how you want to buffer (ie each worker has its own buffer or a central buffer like Redis). Spring integration has an aggregator I believe.

Solution 4

"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue?" This is the answer "use multiple consumers to consume the incoming message simultaneously", use multi-threading to run in parallel these consumers implementing principle shared nothing, http://www.eaipatterns.com/CompetingConsumers.html

Share:
23,758

Related videos on Youtube

tonga
Author by

tonga

Updated on October 31, 2020

Comments

  • tonga
    tonga over 3 years

    I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?

    The message consumer looks like the following:

    public void onMessage() throws IOException, InterruptedException {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
    
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
    
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
    
            JSONObject json = new JSONObject(message);
            String caseID = json.getString("caseID");
            //following takes very long time            
            dao.saveToDB(caseID);
        }
    }
    

    Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.

  • tonga
    tonga over 9 years
    From RabbitMQ's documentation, there are two approaches here: Worker queue, publish/subscribe. I'm using pub/sub model right now. Should I use worker queue instead for multiple consumers?
  • voutrin
    voutrin over 9 years
    For what you need it should be worker queue. This is how it can be implemented github.com/victorpictor/Hotel/blob/master/Infrastructure/…
  • tonga
    tonga over 9 years
    But what if I want to use several queues for different purposes? Right now there is only one queue for caseID message. There may be more data other than caseID. So I may need to use publish/subscribe model to have multiple queues.
  • voutrin
    voutrin over 9 years
    In that case you still can have competing consumers for long running message consumption which you have now, and other consumer type for other queue. You'll achieve this changing you exchange type.
  • tonga
    tonga over 9 years
    Thanks Paul. This is a really good suggestion. My data does not need to be persisted in the DB immediately. The DB persistent part takes very long time because it involves data parsing for each case and then saving large amount of data (~10000 rows) in one DB insert. So using Redis is a good idea since it's an in-memory cache. But eventually I still need to persist the data to DB. So how can I use Redis to accomplish the DB write task after the message consumer takes the message and saves to Redis? If the DB insert is very slow, will the consumer overflow Redis cache size limit?
  • Paul Mooney
    Paul Mooney over 9 years
    I would consume each message from either a single, or multiple processes, purging the message from Redis once it's commited to DB. There is no cache limit in Redis - you're limited by the amount of RAM on the host machine. 1,000,000 relatively small keys is roughly 200Mb. If you're worried about running out of memory, check this out: redis.io/topics/memory-optimization
  • Paul Mooney
    Paul Mooney over 9 years
    I've added a post, outlining the approaches to scaling out AMQP, and the associated rewards and drawbacks: insidethecpu.com/2014/11/11/rabbitmq-qos-vs-competing-consum‌​ers
  • Michiel Borkent
    Michiel Borkent over 7 years
    @PaulMooney RabbitMQ newbie here. What is the benefit of storing messages in Redis vs. consuming them slowly by setting qos + manual ack instead of auto ack?
  • Paul Mooney
    Paul Mooney over 7 years
    The two aren't necessarily related; storing in Redis offers durability, so that failed reads can be retried, and minimal message-loss can be achieved in cases of unforeseen shutdown. Applying custom QOS and manual ACK allows for a greater degree of granularity in your design, as well as potentially balancing a cluster based on traffic volume.