Celery parallel distributed task with multiprocessing

46,902

Solution 1

Your goals are:

  1. Distribute your work to many machines (distributed computing/distributed parallel processing)
  2. Distribute the work on a given machine across all CPUs (multiprocessing/threading)

Celery can do both of these for you fairly easily. The first thing to understand is that each celery worker is configured by default to run as many tasks as there are CPU cores available on a system:

Concurrency is the number of prefork worker process used to process your tasks concurrently, when all of these are busy doing work new tasks will have to wait for one of the tasks to finish before it can be processed.

The default concurrency number is the number of CPU’s on that machine (including cores), you can specify a custom number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than twice the number of CPU’s is rarely effective, and likely to degrade performance instead.

This means each individual task doesn't need to worry about using multiprocessing/threading to make use of multiple CPUs/cores. Instead, celery will run enough tasks concurrently to use each available CPU.

With that out of the way, the next step is to create a task that handles processing some subset of your list_of_millions_of_ids. You have a couple of options here - one is to have each task handle a single ID, so you run N tasks, where N == len(list_of_millions_of_ids). This will guarantee that work is evenly distributed amongst all your tasks since there will never be a case where one worker finishes early and is just waiting around; if it needs work, it can pull an id off the queue. You can do this (as mentioned by John Doe) using the celery group.

tasks.py:

@app.task
def process_ids(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

And to execute the tasks:

from celery import group
from tasks import process_id

jobs = group(process_ids(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

Another option is to break the list into smaller pieces and distribute the pieces to your workers. This approach runs the risk of wasting some cycles, because you may end up with some workers waiting around while others are still doing work. However, the celery documentation notes that this concern is often unfounded:

Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you are avoiding the overhead of messaging it may considerably increase performance.

So, you may find that chunking the list and distributing the chunks to each task performs better, because of the reduced messaging overhead. You can probably also lighten the load on the database a bit this way, by calculating each id, storing it in a list, and then adding the whole list into the DB once you're done, rather than doing it one id at a time. The chunking approach would look something like this

tasks.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

And to start the tasks:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

You can experiment a bit with what chunking size gives you the best result. You want to find a sweet spot where you're cutting down messaging overhead while also keeping the size small enough that you don't end up with workers finishing their chunk much faster than another worker, and then just waiting around with nothing to do.

Solution 2

In the world of distribution there is only one thing you should remember above all :

Premature optimization is the root of all evil. By D. Knuth

I know it sounds evident but before distributing double check you are using the best algorithm (if it exists...). Having said that, optimizing distribution is a balancing act between 3 things:

  1. Writing/Reading data from a persistent medium,
  2. Moving data from medium A to medium B,
  3. Processing data,

Computers are made so the closer you get to your processing unit (3) the faster and more efficient (1) and (2) will be. The order in a classic cluster will be : network hard drive, local hard drive, RAM, inside processing unit territory... Nowadays processors are becoming sophisticated enough to be considered as an ensemble of independent hardware processing units commonly called cores, these cores process data (3) through threads (2). Imagine your core is so fast that when you send data with one thread you are using 50% of the computer power, if the core has 2 threads you will then use 100%. Two threads per core is called hyper threading, and your OS will see 2 CPUs per hyper threaded core.

Managing threads in a processor is commonly called multi-threading. Managing CPUs from the OS is commonly called multi-processing. Managing concurrent tasks in a cluster is commonly called parallel programming. Managing dependent tasks in a cluster is commonly called distributed programming.

So where is your bottleneck ?

  • In (1): Try to persist and stream from the upper level (the one closer to your processing unit, for example if network hard drive is slow first save in local hard drive)
  • In (2): This is the most common one, try to avoid communication packets not needed for the distribution or compress "on the fly" packets (for example if the HD is slow, save only a "batch computed" message and keep the intermediary results in RAM).
  • In (3): You are done! You are using all the processing power at your disposal.

What about Celery ?

Celery is a messaging framework for distributed programming, that will use a broker module for communication (2) and a backend module for persistence (1), this means that you will be able by changing the configuration to avoid most bottlenecks (if possible) on your network and only on your network. First profile your code to achieve the best performance in a single computer. Then use celery in your cluster with the default configuration and set CELERY_RESULT_PERSISTENT=True :

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

During execution open your favorite monitoring tools, I use the default for rabbitMQ and flower for celery and top for cpus, your results will be saved in your backend. An example of network bottleneck is tasks queue growing so much that they delay execution, you can proceed to change modules or celery configuration, if not your bottleneck is somewhere else.

Solution 3

Why not use group celery task for this?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

Basically, you should divide ids into chunks (or ranges) and give them to a bunch of tasks in group.

For smth more sophisticated, like aggregating results of particular celery tasks, I have successfully used chord task for similar purpose:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Increase settings.CELERYD_CONCURRENCY to a number that is reasonable and you can afford, then those celery workers will keep executing your tasks in a group or a chord until done.

Note: due to a bug in kombu there were trouble with reusing workers for high number of tasks in the past, I don't know if it's fixed now. Maybe it is, but if not, reduce CELERYD_MAX_TASKS_PER_CHILD.

Example based on simplified and modified code I run:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarize gets results of all single_batch_processor tasks. Every task runs on any Celery worker, kombu coordinates that.

Now I get it: single_batch_processor and summarize ALSO have to be celery tasks, not regular functions - otherwise of course it will not be parallelized (I'm not even sure chord constructor will accept it if it's not a celery task).

Solution 4

Adding more celery workers will certainly speed up executing the task. You might have another bottleneck though: the database. Make sure it can handle the simultaneous inserts/updates.

Regarding your question: You are adding celery workers by assigning another process on your EC2 instances as celeryd. Depending on how many workers you need you might want to add even more instances.

Share:
46,902

Related videos on Youtube

Prometheus
Author by

Prometheus

Updated on July 03, 2021

Comments

  • Prometheus
    Prometheus almost 3 years

    I have a CPU intensive Celery task. I would like to use all the processing power (cores) across lots of EC2 instances to get this job done faster (a celery parallel distributed task with multiprocessing - I think).

    The terms, threading, multiprocessing, distributed computing, distributed parallel processing are all terms I'm trying to understand better.

    Example task:

      @app.task
      for item in list_of_millions_of_ids:
          id = item # do some long complicated equation here very CPU heavy!!!!!!! 
          database.objects(newid=id).save()
    

    Using the code above (with an example if possible) how one would ago about distributed this task using Celery by allowing this one task to be split up utilising all the computing CPU power across all available machine in the cloud?

  • Prometheus
    Prometheus almost 10 years
    > Adding more celery workers will certainly speed up executing the task. --- Does it? So your saying celery will distribute that one task among all my instances without me having to slit it up?
  • Prometheus
    Prometheus almost 10 years
    From my understanding this would split the task up but is not using celery parallel distributed task with multiprocessing. i.e. just using all free CPU power across all cloud machines.
  • LetMeSOThat4U
    LetMeSOThat4U almost 10 years
    I'm not sure why this would happen - Celery works like you have a bunch of workers, regardless of where they are located, they could even be located on another machine. Of course you need to have more than one worker. chord (with CELERYD_CONCURRENCY set to dozens of workers == logical cpus / hardware threads) is how I process large numbers of log file batches in a parallel manner over multiple cores.
  • Torsten Engelbrecht
    Torsten Engelbrecht almost 10 years
    Wait a sec. I just read your code again and since its just one task this will not help. You could fire one task per id (or chunks of ids). Or you follow John Doe's advice in the other answer. Then you can profit from the amount of celery workers. And yes, in this case you don't need to do much. Just make sure the workers consume the same queues.
  • Prometheus
    Prometheus almost 10 years
    So the part where I do a "along complicated CPU heavy task (3d rendering maybe)" will be automatically distributed parallel processed i.e. 1 task will be using as much processing power as is available across all instances --- and all this out-of-the-box? really? wow. PS good answer thanks for explaining this to me better.
  • dano
    dano almost 10 years
    @Spike Not quite. The tasks, as currently written can only ever use one core. To make an individual task use more than one core, we'd to introduce threading or multiprocessing. Instead of doing that, we have each celery worker spawn as many tasks as there are cores available on the machine (this happens by default in celery). That means that across your whole cluster, every core can be used to process your list_of_million_ids, by having each task utilizing a single core. So rather than having a single task use many cores, we're having many tasks each use one core. Does that make sense?
  • Tristan
    Tristan about 9 years
    "To make an individual task use more than one core, we'd to introduce threading or multiprocessing". Assuming we can't split that heavy task into multiple ones, how would you use threading or multiprocessing to get celery to split the task between multiple instances? thanks
  • dano
    dano about 9 years
    @Tristan It's dependent on what the task is actually doing. However, in most cases I would say that if you can't split the task itself into sub-tasks, you'll probably have a difficult time using multiprocessing to split the work up from inside the task itself, since both approaches ultimately require doing the same thing: splitting a task into smaller tasks which can be run in parallel. You're really only changing the point at which you're doing the splitting.
  • A. B
    A. B almost 9 years
    How is the first goal "Distribute your work to many machines (distributed computing/distributed parallel processing) " addressed here? I might have missed it. (My understanding is that there will be workers and/or queues running on different hosts
  • dano
    dano almost 9 years
    @A.B Celery can distribute work to many machines, by leveraging the clustering features of RabbitMQ (which it runs on top of). The OP's question implied that he/she already had a Celery cluster configured, so my answer mostly concentrated on what techniques could be used to evenly distribute work to that cluster.
  • A. B
    A. B almost 9 years
    @dano thanks for the clarification. I can take a look at "clustering in rabbitMQ" now :)
  • PirateApp
    PirateApp almost 6 years
    upvoted forgive me if this question is a bit silly , the issue here on celery seems to suggest they dont support multiprocessing github.com/celery/celery/issues/4551 is each worker an instance of a thread or a process, could you kindly clarify thanks
  • dano
    dano almost 6 years
    @PirateApp That issue is saying that you can't use multiprocessing inside a Celery task. Celery itself is using billiard (a multiprocessing fork) to run your tasks in separate processes. You're just not allowed to then use multiprocessing inside of them.
  • PirateApp
    PirateApp almost 6 years
    @dano thanks for the clarification! super appreciated
  • Prisacari Dmitrii
    Prisacari Dmitrii over 4 years
    This is a REALLY BAD example of code. The task do_matches will be blocked by awaiting the chord. This might possibly lead into partial or full deadlock, as many/all workers might wait for subtasks, none of which will be done (as workers wait for subtasks instead of working hard).
  • LetMeSOThat4U
    LetMeSOThat4U over 4 years
    @PrisacariDmitrii So what would be the right solution then?
  • schrödingcöder
    schrödingcöder over 3 years
    Sorry if I'm wrong, but doesn't chunks run sequentially? So to achieve faster, parallel execution wouldn't we need to transform jobs in the last code example into a group? Btw thanks for a super clear and useful answer.