How to retract a message in RabbitMQ?

11,000

Solution 1

I would solve this scenario by having the worker check some sort of authoritative data source to determine if the the job should proceed or not. For example, the worker would check the job's status in a database to see if the job was canceled already.

For scenarios where the speed of processing jobs may be faster than the speed with which the authoritative store can be updated and read, a less guaranteed data store that trades speed for other characteristics may be useful.

An example of this would be to use Redis as the store for canceling processing of a message instead of a relational DB like MySQL. Redis is very fast, but makes fewer guarantees regarding the data it holds, whereas MySQL is much slower, but offers more guarantees about the data it holds.

In the end, the concept of checking with another source for whether or not to process a message is the same, but the way you implement that depends on your particular scenario.

Solution 2

RabbitMQ doesn't let you modify or delete messages after they've been enqueued. For that, you want some kind of database to hold the state of each job, and to use RabbitMQ to notify interested parties of changes in that state.

For lowish volumes, you can kludge it together with a queue per job. Create the queue, post the job description to the queue, announce the name of the queue to the workers. If the job needs to be cancelled before it is processed, deleted the job's queue; when the workers come to fetch the job description, they'll notice the queue has vanished.

Lighterweight and generally better would be to use redis or another key/value store to hold the job state (with a deleted or absent record meaning a cancelled or nonexistent job) and to use rabbitmq to notify about new/removed/changed records in the key/value store.

Solution 3

At least two ways to achieve your target:

Solution 4

You need to subscribe to all the queues to which messages have been routed, and consume them with ack.

For instance if you publish to a topic exchange with "test" as the routing key, and there are 3 persistent queues which subscribe to "test" you would need to consume those three queues. It might be better to add another queue which your consumer processes would also listen too, and tell them to ignore those messages.

An alternative, since you are using RabbitMQ, is to write a custom exchange plugin that will accept some out of band instruction to clear all queues. For instance you might have that exchange read a special message header that tells it to clear all queues to which this message is destined. This does require writing Erlang code, but there are 4 different exchange types implemented so you would only need to copy the most similar one and write the code for the new bahaviours. If you only use custom headers for this, then the body of the message can be a normal message for the consumers.

To sum up:

1) the publisher needs to consume the messages itself 2) the publisher can send a special message in a special queue to tell consumers to ignore the message 3) the publisher can send a special message to a custom exchange that will clear any existing messages from the queues before sending this special message to consumers.

Share:
11,000
jkff
Author by

jkff

I'm Eugene Kirpichov and I work on Google Cloud Dataflow and Apache Beam.

Updated on July 22, 2022

Comments

  • jkff
    jkff almost 2 years

    I've got something like a job queue over RabbitMQ and, upon a request to cancel a job, I'd like to retract the tasks that have not yet started processing (their messages have not been ack'd), which corresponds to retracting these messages from the queues that they've been routed to.

    I haven't found this functionality in AMQP or in the RabbitMQ API; perhaps I haven't searched well enough? Or will I have to use a workaround (it's not hard, but still)?

  • jkff
    jkff over 13 years
    Nope, this solves a different problem. I don't need to reject a message at the consumer side, I want to cancel its delivery from the producer side, so that the message does not reach a consumer like if it never existed at all. In my problem, a consumer can't decide whether a message should be rejected.
  • Sergei Zimakov
    Sergei Zimakov over 13 years
    If messages successfuly published and you need to remove them from known queue, then subscribe ot it and consume them in producer.