tracking progress of a celery.group task?

12,259

Solution 1

tinkering around on the shell (ipython's tab auto-completion) I found that group_task (which is a celery.result.ResultSet object) had a method called completed_count which gave exactly what I needed.

Also found the documentation at http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count

Solution 2

Here's a full working example based on @dalore's answer.

First tasks.py.

import time
from celery import Celery, group

app = Celery('tasks', broker='pyamqp://[email protected]//', backend='redis://localhost')

@app.task(trail=True)
def add(x, y):
    time.sleep(1)
    return x + y

@app.task(trail=True)
def group_add(l1, l2):
    return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()

Start redis server using Docker: docker run --name my-redis -p 6379:6379 -d redis.

Start RabbitMQ using Docker: docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine.

Start a single process celery worker in a separate shell: celery -A tasks worker --loglevel=info -c 1.

Then run the test script below.

from tasks import group_add
from tqdm import tqdm

total = 10

l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get()  # Wait for parent task to be ready.

results = []
for result in tqdm(delayed_results.children[0], total=total):
    results.append(result.get())
print(results)

You should see something like the following with the progress bar increasing by 10% every second.

50%|#####     | 5/10 [00:05<00:05,  1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Finally, clean up your redis and rabbitmq containers.

docker stop my-rabbit my-redis
docker rm my-rabbit my-redis

Solution 3

Reading the documentation for AsyncResult there is a collect method that collects results as they come in.

http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.collect

from celery import group
from proj.celery import app

@app.task(trail=True)
def A(how_many):
    return group(B.s(i) for i in range(how_many))()

@app.task(trail=True)
def B(i):
    return pow2.delay(i)

@app.task(trail=True)
def pow2(i):
    return i ** 2

Example output:

>>> from celery.result import ResultBase
>>> from proj.tasks import A

>>> result = A.delay(10)
>>> [v for v in result.collect()
...  if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Note: The Task.trail option must be enabled so that the list of children is stored in result.children. This is the default but enabled explicitly for illustration.

Edit:

Upon further testing this have found that whilst collect states it will collect results, it still waits. I found that to get the progress you need to get the result of the children, like so:

group_result = mygrouptask.delay().get()
for result in tqdm(group_result.children, total=count):
    yield result.get()

tqdm displays a progress in the console

The mygrouptask is a returning a celery group like so:

return group(mytask.s(arg) for arg in args)()
Share:
12,259
Anuvrat Parashar
Author by

Anuvrat Parashar

Programmer biased towards Linux, Python, FOSS etc.

Updated on July 19, 2022

Comments

  • Anuvrat Parashar
    Anuvrat Parashar almost 2 years
    @celery.task
    def my_task(my_object):
        do_something_to_my_object(my_object)
    
    
    #in the code somewhere 
    tasks = celery.group([my_task.s(obj) for obj in MyModel.objects.all()])
    group_task = tasks.apply_async()
    

    Question: Does celery have something to detect the progress of a group task? Can I get the count of how many tasks were there and how many have been processed?

  • zerohedge
    zerohedge over 6 years
    Hi, probably been a long time since you encountered this but I'm wondering how you use this to track the progress of the group tasks without blocking..? As I understand, I need to assign result = task_group.apply_async(), but the mere assignment itself blocks. On the other hands, if we don't assign, we don't have the ResultSet methods that are completed_count etc...
  • Anentropic
    Anentropic about 6 years
    this will wait for all the child tasks to complete then return their results, it does not show progress while the the group is still running
  • Anentropic
    Anentropic about 6 years
    @zerohedge result = task_group.apply_async() should not block to await results, it will block until all the tasks are enqueued, which make take a while if you have lots of tasks
  • dalore
    dalore about 6 years
    Updated to actually produce a progress, working in production
  • lbcommer
    lbcommer almost 3 years
    I think you have to put 'trail=True' in @app.task and not in the definition of the functions
  • Carl
    Carl almost 3 years
    @lbcommer Thanks. Fixed. It's set by default so isn't necessary but as in dalore's answer, I like it to be explicit since this behaviour depends on it.