How to inspect and cancel Celery tasks by task name

27,449

Solution 1

# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)

# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
    celery.control.revoke(uuid, terminate=True)

Solution 2

There is one issue that earlier answers have not addressed and may throw off people if they are not aware of it.

Among those solutions already posted, I'd use Danielle's with one minor modification: I'd import the task into my file and use its .name attribute to get the task name to pass to .tasks_by_type().

app.control.revoke(
    [uuid for uuid, _ in
     celery.events.state.State().tasks_by_type(task.name)])

However, this solution will ignore those tasks that have been scheduled for future execution. Like some people who commented on other answers, when I checked what .tasks_by_type() return I had an empty list. And indeed my queues were empty. But I knew that there were tasks scheduled to be executed in the future and these were my primary target. I could see them by executing celery -A [app] inspect scheduled but they were unaffected by the code above.

I managed to revoke the scheduled tasks by doing this:

app.control.revoke(
    [scheduled["request"]["id"] for scheduled in
     chain.from_iterable(app.control.inspect().scheduled()
                         .itervalues())])

app.control.inspect().scheduled() returns a dictionary whose keys are worker names and values are lists of scheduling information (hence, the need for chain.from_iterable which is imported from itertools). The task information is in the "request" field of the scheduling information and "id" contains the task id. Note that even after revocation, the scheduled task will still show among the scheduled tasks. Scheduled tasks that are revoked won't get removed from the list of scheduled tasks until their timers expire or until Celery performs some cleanup operation. (Restarting workers triggers such cleanup.)

Solution 3

You can do this in one request:

app.control.revoke([
    uuid
    for uuid, _ in
    celery.events.state.State().tasks_by_type(task_name)
])

Solution 4

As usual with Celery, none of the answers here worked for me at all, so I did my usual thing and hacked together a solution that just inspects redis directly. Here we go...

# First, get a list of tasks from redis:
import redis, json

r = redis.Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)

# Now import the task you want so you can get its name
from my_django.tasks import my_task

# Now, import your celery app and iterate over all tasks 
# from redis and nuke the ones that have a matching name.
from my_django.celery_init import app
for task in l:
     task_headers = json.loads(task)['headers']
     task_name = task_headers["task"]
     if task_name == my_task.name:
         task_id = task_headers['id']
         print("Terminating: %s" % task_id)
         app.control.revoke(task_id, terminate=True)

Note that revoking in this way might not revoke prefetched tasks, so you might not see results immediately.

Also, this answer doesn't support prioritized tasks. If you want to modify it to do that, you'll want some of the tips in my other answer that hacks redis.

Solution 5

It looks like flower provides monitoring:

https://github.com/mher/flower

Real-time monitoring using Celery Events

Task progress and history Ability to show task details (arguments, start time, runtime, and more) Graphs and statistics Remote Control

View worker status and statistics Shutdown and restart worker instances Control worker pool size and autoscale settings View and modify the queues a worker instance consumes from View currently running tasks View scheduled tasks (ETA/countdown) View reserved and revoked tasks Apply time and rate limits Configuration viewer Revoke or terminate tasks HTTP API

OpenID authentication

Share:
27,449
Mzzzzzz
Author by

Mzzzzzz

Updated on January 10, 2021

Comments

  • Mzzzzzz
    Mzzzzzz over 3 years

    I'm using Celery (3.0.15) with Redis as a broker.

    Is there a straightforward way to query the number of tasks with a given name that exist in a Celery queue?

    And, as a followup, is there a way to cancel all tasks with a given name that exist in a Celery queue?

    I've been through the Monitoring and Management Guide and don't see a solution there.

  • Mzzzzzz
    Mzzzzzz about 11 years
    This sounded promising, but I wasn't able to get any results. I set CELERY_SEND_TASK_SENT_EVENT to True for my workers, but calls to celery.events.State().tasks_by_type(...) returns an empty list.
  • gioi
    gioi about 11 years
    Uhm, try to use directly celery.events.state.state, as celerymon does. See the source.
  • Mzzzzzz
    Mzzzzzz about 11 years
    From the celery shell, celery.events.state.state still gives me an empty list. Is there a missing step to initialize consuming from the event queue?
  • gioi
    gioi about 11 years
    Ok, you're right: you need to feed State objects. Copy & paste github.com/celery/celerymon/blob/master/celerymon/…, then call EventConsumer.start()
  • user541905
    user541905 almost 8 years
    I don't really get how this relates to the answer. When does the start stop? When memory is full?
  • mannysz
    mannysz almost 8 years
    awesome one-liner solution
  • 2ps
    2ps almost 6 years
    n.b., you have to use State().tasks_by_type for celery 4+.
  • Nilesh
    Nilesh almost 3 years
    is there any way to know using command that which tasks are schedule under cron ? like beat scheduler ?
  • Reiner Gerecke
    Reiner Gerecke almost 3 years
    Great answer. Trying to get anything out of the celery cli is usually unsuccessful in my cases, but this answer immediately got me started!