Bull Queue Concurrency Questions

19,011

Solution 1

The TL;DR is: under normal conditions, jobs are being processed only once. If things go wrong (say Node.js process crashes), jobs may be double processed.

Quoting from Bull's official README.md:

Important Notes

The queue aims for an "at least once" working strategy. This means that in some situations, a job could be processed more than once. This mostly happens when a worker fails to keep a lock for a given job during the total duration of the processing.

When a worker is processing a job it will keep the job "locked" so other workers can't process it.

It's important to understand how locking works to prevent your jobs from losing their lock - becoming stalled - and being restarted as a result. Locking is implemented internally by creating a lock for lockDuration on interval lockRenewTime (which is usually half lockDuration). If lockDuration elapses before the lock can be renewed, the job will be considered stalled and is automatically restarted; it will be double processed. This can happen when:

  1. The Node process running your job processor unexpectedly terminates.
  2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the lockDuration setting (with the tradeoff being that it will take longer to recognize a real stalled job).

As such, you should always listen for the stalled event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.

As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor aways crashes its Node process), jobs will be recovered from a stalled state a maximum of maxStalledCount times (default: 1).

Solution 2

I spent a bunch of time digging into it as a result of facing a problem with too many processor threads.

The short story is that bull's concurrency is at a queue object level, not a queue level.

If you dig into the code the concurrency setting is invoked at the point in which you call .process on your queue object. This means that even within the same Node application if you create multiple queues and call .process multiple times they will add to the number of concurrent jobs that can be processed.

One contributor posted the following:

Yes, It was a little surprising for me too when I used Bull first time. Queue options are never persisted in Redis. You can have as many Queue instances per application as you want, each can have different settings. The concurrency setting is set when you're registering a processor, it is in fact specific to each process() function call, not Queue. If you'd use named processors, you can call process() multiple times. Each call will register N event loop handlers (with Node's process.nextTick()), by the amount of concurrency (default is 1).

So the answer to your question is: yes, your processes WILL be processed by multiple node instances if you register process handlers in multiple node instances.

Solution 3

Bull is designed for processing jobs concurrently with "at least once" semantics, although if the processors are working correctly, i.e. not stalling or crashing, it is in fact delivering "exactly once". However you can set the maximum stalled retries to 0 (maxStalledCount https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queue) and then the semantics will be "at most once".

Having said that I will try to answer to the 2 questions asked by the poster:

What happens if one Node instance specifies a different concurrency value?

I will assume you mean "queue instance". If so, the concurrency is specified in the processor. If the concurrency is X, what happens is that at most X jobs will be processed concurrently by that given processor.

Can I be certain that jobs will not be processed by more than one Node instance?

Yes, as long as your job does not crash or your max stalled jobs setting is 0.

Solution 4

Looking into it more, I think Bull doesn't handle being distributed across multiple Node instances at all, so the behavior is at best undefined.

Solution 5

Ah Welcome! This is a meta answer and probably not what you were hoping for but a general process for solving this:

You can specify a concurrency argument. Bull will then call your handler in parallel respecting this maximum value.

I personally don't really understand this or the guarantees that bull provides. Since it's not super clear:

IMO the biggest thing is:

Can I be certain that jobs will not be processed by more than one Node instance?

If exclusive message processing is an invariant and would result in incorrectness for your application, even with great documentation, I would highly recommend to perform due diligence on the library :p

Share:
19,011

Related videos on Youtube

Brian
Author by

Brian

Updated on September 14, 2022

Comments

  • Brian
    Brian over 1 year

    I need help understanding how Bull Queue (bull.js) processes concurrent jobs.

    Suppose I have 10 Node.js instances that each instantiate a Bull Queue connected to the same Redis instance:

    const bullQueue = require('bull');
    const queue = new bullQueue('taskqueue', {...})
    const concurrency = 5;
    queue.process('jobTypeA', concurrency, job => {...do something...});
    

    Does this mean that globally across all 10 node instances there will be a maximum of 5 (concurrency) concurrently running jobs of type jobTypeA? Or am I misunderstanding and the concurrency setting is per-Node instance?

    What happens if one Node instance specifies a different concurrency value?

    Can I be certain that jobs will not be processed by more than one Node instance?

    • Mazen Elkashef
      Mazen Elkashef over 2 years
      Talking about BullMQ here (looks like a polished Bull refactor), the concurrency factor is per worker, so if each instance of the 10 has 1 worker with a concurrency factor of 5, you should get 50 global concurrency factor, if one instance has a different config it will just receive less jobs/message probably, let's say it's a smaller machine than the others, as for your last question, Stas Korzovsky's answer seems to cover your last question well.
  • zenbeni
    zenbeni over 5 years
    Well bull jobs are well distributed, as long as they consume the same topic on a unique redis. Each bull consumes a job on the redis queue, and your code defines that at most 5 can be processed per node concurrently, that should make 50 (seems a lot).
  • Shawn
    Shawn almost 4 years
    After realizing the concurrency "piles up" every time a queue registers .process(), this does make it harder to do horizontal scaling, even harder for elastic scaling. When you have the main nodejs process spreading on N nodes / machine - perhaps as replicas of kubernetes deployment, each will run the same code and thus registers .process() N times. You'll have to explicitly correct this to not call .process() in some replica. I think people are expecting a centralized concurrency management here - after all Bull already uses Redis ; unfortunately Bull does not provide this yet.