Celery chaining tasks sequentially

14,615

Solution 1

A chain is always passed the previous result as a first argument. From the chains documentation:

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

Your upload_ftp_image task doesn't accept this extra argument, and thus it fails.

You have a fine use case for chaining; the second task is guaranteed to be called after the first task is completed (otherwise the result could not be passed on anyway).

Simply add an argument for the result from the previous task:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory):

You could make some use of that result value; maybe make it the download method return the path of the downloaded file so the upload method knows what to upload?

Solution 2

Another option if you don't want the return value of the previous task to be used as an argument, is to use 'immutability'.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

Instead of defining your subtasks as:

download_ftp_image.s(...) and upload_ftp_image.s(...)

define them as:

download_ftp_image.si(...) and upload_ftp_image.si(...)

And you can now use the tasks with the usual number of arguments in a chain.

Share:
14,615
psychok7
Author by

psychok7

Software Engineer

Updated on June 07, 2022

Comments

  • psychok7
    psychok7 about 2 years

    i need to download a file through ftp, change it and upload it back. I am using celery to do this but i am running into problems when trying to use chaining, where i am getting :

    TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

    Also, can i use chains and be assured that the steps will be sequential? if not what is the alternative?

    res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
    print res.get()
    

    Tasks:

    @task()
    def download_ftp_image(ftp_server, username , password , filename, directory):
        try:
            ftp = FTP(ftp_server)
            ftp.login(username, password)
            if not os.path.exists(directory):
                os.makedirs(directory)
                ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
            else:
                ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
            ftp.quit()
        except error_perm, resp:
            raise download_ftp_image.retry(countdown=15)
    
        return "SUCCESS: "  
    
    @task()
    def upload_ftp_image(ftp_server, username , password , file , directory):
        try:
            ftp = FTP(ftp_server)
            ftp.login(username, password)
            new_file= file.replace(directory, "")
            directory = directory.replace("tmp","")
            try:
                ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
            except:
                ftp.mkd(directory)
                ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
            ftp.quit()
        except error_perm, resp:
            raise upload_ftp_image.retry(countdown=15)
    
        return "SUCCESS: "
    

    and is this a good or a bad practice for my specific case? :

    result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
    result.get()
    result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
    #result.get()
    
  • Martijn Pieters
    Martijn Pieters over 11 years
    @psychok7: I added an extra argument to the upload_ftp_image() signature already, did you not see that? It'll be passed whatever download_ftp_image returned (provided it is serializable, of course).
  • Martijn Pieters
    Martijn Pieters over 11 years
    @psychok7: use the routing_key argument, see docs.celeryproject.org/en/latest/userguide/routing.html