Celery chaining tasks sequentially
Solution 1
A chain is always passed the previous result as a first argument. From the chains documentation:
The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in
mul(4, 16)
since the result is 4.
Your upload_ftp_image
task doesn't accept this extra argument, and thus it fails.
You have a fine use case for chaining; the second task is guaranteed to be called after the first task is completed (otherwise the result could not be passed on anyway).
Simply add an argument for the result from the previous task:
def upload_ftp_image(download_result, ftp_server, username , password , file , directory):
You could make some use of that result value; maybe make it the download method return the path of the downloaded file so the upload method knows what to upload?
Solution 2
Another option if you don't want the return value of the previous task to be used as an argument, is to use 'immutability'.
http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability
Instead of defining your subtasks as:
download_ftp_image.s(...) and upload_ftp_image.s(...)
define them as:
download_ftp_image.si(...) and upload_ftp_image.si(...)
And you can now use the tasks with the usual number of arguments in a chain.
![psychok7](https://i.stack.imgur.com/zv3UX.jpg?s=256&g=1)
Comments
-
psychok7 about 2 years
i need to download a file through ftp, change it and upload it back. I am using celery to do this but i am running into problems when trying to use chaining, where i am getting :
TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)
Also, can i use chains and be assured that the steps will be sequential? if not what is the alternative?
res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async() print res.get()
Tasks:
@task() def download_ftp_image(ftp_server, username , password , filename, directory): try: ftp = FTP(ftp_server) ftp.login(username, password) if not os.path.exists(directory): os.makedirs(directory) ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) else: ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) ftp.quit() except error_perm, resp: raise download_ftp_image.retry(countdown=15) return "SUCCESS: " @task() def upload_ftp_image(ftp_server, username , password , file , directory): try: ftp = FTP(ftp_server) ftp.login(username, password) new_file= file.replace(directory, "") directory = directory.replace("tmp","") try: ftp.storbinary("STOR " + directory + new_file , open(file, "rb")) except: ftp.mkd(directory) ftp.storbinary("STOR " + directory + new_file, open(file, "rb")) ftp.quit() except error_perm, resp: raise upload_ftp_image.retry(countdown=15) return "SUCCESS: "
and is this a good or a bad practice for my specific case? :
result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') result.get() result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') #result.get()
-
Martijn Pieters over 11 years@psychok7: I added an extra argument to the
upload_ftp_image()
signature already, did you not see that? It'll be passed whateverdownload_ftp_image
returned (provided it is serializable, of course). -
Martijn Pieters over 11 years@psychok7: use the
routing_key
argument, see docs.celeryproject.org/en/latest/userguide/routing.html