tracking progress of a task?


Solution 1

tinkering around on the shell (ipython's tab auto-completion) I found that group_task (which is a celery.result.ResultSet object) had a method called completed_count which gave exactly what I needed.

Also found the documentation at

Solution 2

Here's a full working example based on @dalore's answer.


import time
from celery import Celery, group

app = Celery('tasks', broker='pyamqp://[email protected]//', backend='redis://localhost')

def add(x, y):
    return x + y

def group_add(l1, l2):
    return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()

Start redis server using Docker: docker run --name my-redis -p 6379:6379 -d redis.

Start RabbitMQ using Docker: docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine.

Start a single process celery worker in a separate shell: celery -A tasks worker --loglevel=info -c 1.

Then run the test script below.

from tasks import group_add
from tqdm import tqdm

total = 10

l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get()  # Wait for parent task to be ready.

results = []
for result in tqdm(delayed_results.children[0], total=total):

You should see something like the following with the progress bar increasing by 10% every second.

50%|#####     | 5/10 [00:05<00:05,  1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Finally, clean up your redis and rabbitmq containers.

docker stop my-rabbit my-redis
docker rm my-rabbit my-redis

Solution 3

Reading the documentation for AsyncResult there is a collect method that collects results as they come in.

from celery import group
from proj.celery import app

def A(how_many):
    return group(B.s(i) for i in range(how_many))()

def B(i):
    return pow2.delay(i)

def pow2(i):
    return i ** 2

Example output:

>>> from celery.result import ResultBase
>>> from proj.tasks import A

>>> result = A.delay(10)
>>> [v for v in result.collect()
...  if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Note: The Task.trail option must be enabled so that the list of children is stored in result.children. This is the default but enabled explicitly for illustration.


Upon further testing this have found that whilst collect states it will collect results, it still waits. I found that to get the progress you need to get the result of the children, like so:

group_result = mygrouptask.delay().get()
for result in tqdm(group_result.children, total=count):
    yield result.get()

tqdm displays a progress in the console

The mygrouptask is a returning a celery group like so:

return group(mytask.s(arg) for arg in args)()
Anuvrat Parashar
Author by

Anuvrat Parashar

Programmer biased towards Linux, Python, FOSS etc.

Updated on July 19, 2022


  • Anuvrat Parashar
    Anuvrat Parashar almost 2 years
    def my_task(my_object):
    #in the code somewhere 
    tasks =[my_task.s(obj) for obj in MyModel.objects.all()])
    group_task = tasks.apply_async()

    Question: Does celery have something to detect the progress of a group task? Can I get the count of how many tasks were there and how many have been processed?

  • zerohedge
    zerohedge over 6 years
    Hi, probably been a long time since you encountered this but I'm wondering how you use this to track the progress of the group tasks without blocking..? As I understand, I need to assign result = task_group.apply_async(), but the mere assignment itself blocks. On the other hands, if we don't assign, we don't have the ResultSet methods that are completed_count etc...
  • Anentropic
    Anentropic about 6 years
    this will wait for all the child tasks to complete then return their results, it does not show progress while the the group is still running
  • Anentropic
    Anentropic about 6 years
    @zerohedge result = task_group.apply_async() should not block to await results, it will block until all the tasks are enqueued, which make take a while if you have lots of tasks
  • dalore
    dalore about 6 years
    Updated to actually produce a progress, working in production
  • lbcommer
    lbcommer almost 3 years
    I think you have to put 'trail=True' in @app.task and not in the definition of the functions
  • Carl
    Carl almost 3 years
    @lbcommer Thanks. Fixed. It's set by default so isn't necessary but as in dalore's answer, I like it to be explicit since this behaviour depends on it.