Python Celery - How to call celery tasks inside other task

12,233

Solution 1

This should work:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3])

Solution 2

You are right, because each task in you for loop will be overwrite task variable.

You can try celery.group like

from celery import group

and

@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)


    tasks = [post_notification.s(data, server.server_id.url) for server in server_list]
    results = group(tasks)()
    print results.get() # results.status() what ever you want

Solution 3

you can call task from a task using delay function

from app.tasks import celery_add_task
    celery_add_task.apply_async(args=[task_name]) 

... it will work

Share:
12,233
PythonEnthusiast
Author by

PythonEnthusiast

SOreadytohelp

Updated on July 18, 2022

Comments

  • PythonEnthusiast
    PythonEnthusiast almost 2 years

    I'm calling a task within a tasks in Django-Celery

    Here are my tasks.

    @shared_task
    def post_notification(data,url):
        url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
        headers = {'content-type': 'application/json'}
        requests.post(url, data=json.dumps(data), headers=headers)
    
    
    @shared_task
    def shipment_server(data,notification_type):
        notification_obj = Notification.objects.get(name = notification_type)
        server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)
    
        for server in server_list:
            task = post_notification.delay(data,server.server_id.url)
            print task.status # it prints 'Nonetype' has no attribute id
    

    How can I call a task within a task? I read somewhere it can be done using group, but I'm not able to form the correct syntax. How do I do it?

    I tried this

    for server in server_list:
        task = group(post_notification.s(data, server.server_id.url))().get()
        print task.status
    

    Throws a warning saying

    TxIsolationWarning: Polling results w│                                                                        
    ith transaction isolation level repeatable-read within the same transacti│                                                                        
    on may give outdated results. Be sure to commit the transaction for each │                                                                        
    poll iteration.                                                          │                                                                        
      'Polling results with transaction isolation level '
    

    Dont know what it is!!!

    How do I solve my problem?