celery task and customize decorator
Solution 1
Not quite sure why passing arguments won't work?
if you use this example:
@task()
def add(x, y):
return x + y
lets add some logging to the MyCoolTask:
from celery import task
from celery.registry import tasks
import logging
import celery
logger = logging.getLogger(__name__)
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
logger.info("Starting to run")
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
logger.info("Ending run")
pass
and create an extended class (extending MyCoolTask, but now with arguments):
class AddTask(MyCoolTask):
def run(self,x,y):
if x and y:
result=add(x,y)
logger.info('result = %d' % result)
return result
else:
logger.error('No x or y in arguments')
tasks.register(AddTask)
and make sure you pass the kwargs as json data:
{"x":8,"y":9}
I get the result:
[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17
Solution 2
Instead of use decorator why you don't create a base class that extend celery.Task
?
In this way all your tasks can extend your customized task class, where you can implement your personal behavior by using methods __call__
and after_return
.
You can also define common methods and object for all your task.
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
pass
zxygentoo
Updated on June 03, 2022Comments
-
zxygentoo about 2 years
I'm working on a project using django and celery(django-celery). Our team decided to wrap all data access code within
(app-name)/manager.py
(NOT wrap into Managers like thedjango
way), and let code in (app-name)/task.py only dealing with assemble and perform tasks with celery(so we don't have django ORM dependency in this layer).In my
manager.py
, I have something like this:def get_tag(tag_name): ctype = ContentType.objects.get_for_model(Photo) try: tag = Tag.objects.get(name=tag_name) except ObjectDoesNotExist: return Tag.objects.none() return tag def get_tagged_photos(tag): ctype = ContentType.objects.get_for_model(Photo) return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk) def get_tagged_photos_count(tag): return get_tagged_photos(tag).count()
In my task.py, I like to wrap them into tasks (then maybe use these tasks to do more complicated tasks), so I write this decorator:
import manager #the module within same app containing data access functions class mfunc_to_task(object): def __init__(mfunc_type='get'): self.mfunc_type = mfunc_type def __call__(self, f): def wrapper_f(*args, **kwargs): callback = kwargs.pop('callback', None) mfunc = getattr(manager, f.__name__) result = mfunc(*args, **kwargs) if callback: if self.mfunc_type == 'get': subtask(callback).delay(result) elif self.mfunc_type == 'get_or_create': subtask(callback).delay(result[0]) else: subtask(callback).delay() return result return wrapper_f
then (still in
task.py
):#@task @mfunc_to_task() def get_tag(): pass #@task @mfunc_to_task() def get_tagged_photos(): pass #@task @mfunc_to_task() def get_tagged_photos_count(): pass
Things work fine without
@task
. But, after applying that@task
decorator(to the top as celery documentation instructed), things just start to fall apart. Apparently, every time themfunc_to_task.__call__
gets called, the sametask.get_tag
function gets passed asf
. So I ended up with the samewrapper_f
every time, and now the only thing I cat do is to get a single tag.I'm new to decorators. Any one can help me understand what went wrong here, or point out other ways to achieve the task? I really hate to write the same task wrap code for every of my data access functions.
-
zxygentoo almost 13 yearsThx, man. Extending cerlery.task.Task is certainly a way to go, but due to maybe some deep metaclass black magic of celery, I found I couldn't pass arguments to init of MyTask and use it in call and run, so I put all logic in MyTask, and came up with a naming schema to pass the arguments through self.__class__.__name__. Then extends MyTask for every of my data access function using the naming schema to pass arguments, and instantiate once to get the task I need. This way I sure have put all the logic in one place, but still look kinda messy. Is there any elegant solution.
-
Mauro Rocco almost 13 yearsSincerely I think that this is elegant enough. A personal suggestion is, when you can't find an elegant solution probably the whole approach is wrong. But is possible that I'm wrong ;-), Up to you
-
michel.iamit about 11 years@MauroRocco : thanks for your answer and clear explanation in the europycon 2011 youtube presentation, this lead me (after puzzling a bit more) to the answer I gave here. (see: slideshare.net/fireantology/…)
-
Matan Itzhak about 6 yearsDoing this with Celery4.0 gives the following error:
TypeError: __init__() takes exactly 3 arguments (1 given)
. Same goes when trying to make a separate function with the task decorator@app.task(bind=True, base=AddTask)
, as shown here. The only option I know of is to create an instance of the class before registering it. Is there any other way, similar to what you did here? -
michel.iamit about 6 yearsyep, this was a post from 2013 .... 5 years ago.... lots has changed since than. I can see if i can produce a better example with celery 4.0. Can you define a new question? I kind of got away from class based tasks.... in the end did not work out for me. I am using the @shared_task only nowadays....
-
Tjorriemorrie over 2 yearsHow do you run this?
delay
gives error.