Calling celery task hangs for delay and apply_async

11,742

I did a great mistake. My problem was I tried to change something about which I didn't know. I am adding it for others reference.

While installing rabbitmq I changed the default value of ulimit from 1024 to 100 at /etc/default/rabbitmq-server.

I changed the value back to 1024 and the issue is fixed now.

Share:
11,742
Ansuman Bebarta
Author by

Ansuman Bebarta

I am a beginner in python and javascript.

Updated on July 10, 2022

Comments

  • Ansuman Bebarta
    Ansuman Bebarta almost 2 years

    I have created a celery app with following directory structure (as given in celery site):

    proj
    |-- celery.py
    |-- celery.pyc
    |-- __init__.py
    |-- __init__.pyc
    |-- tasks.py
    `-- tasks.pyc
    

    Following are contents of celery.py

    from __future__ import absolute_import
    
    from celery import Celery
    
    app = Celery('proj',
                 broker='amqp://rabbitmquser:<my_passowrd>@localhost:5672/localvhost',
                 #backend='amqp://',
                 include=['proj.tasks'])
    
    # Optional configuration, see the application user guide.
    app.conf.update(
        CELERY_TASK_RESULT_EXPIRES=3600,
    )
    
    if __name__ == '__main__':
        app.start()
    

    Following is the content of tasks.py

    from __future__ import absolute_import
    
    from proj.celery import app
    
    
    @app.task
    def add(x, y):
        return x + y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    

    Now I am starting celery worker with following command:

    celery -A proj worker -l debug
    

    I think worker is running fine as it outputs following on:

    [2014-06-12 21:25:02,326: DEBUG/MainProcess] | Worker: Preparing bootsteps.
    [2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: Building graph...
    [2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, Beat, Autoreloader, StateDB, Consumer}
    [2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
    [2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Building graph...
    [2014-06-12 21:25:02,334: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop}
    [2014-06-12 21:25:02,335: WARNING/MainProcess] /home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning: 
    Starting from version 3.2 Celery will refuse to accept pickle by default.
    
    The pickle serializer is a security concern as it may give attackers
    the ability to execute any command.  It's important to secure
    your broker from unauthorized access when using pickle, so we think
    that enabling pickle should require a deliberate action and not be
    the default choice.
    
    If you depend on pickle then you should set a setting to disable this
    warning and to be sure that everything will continue working
    when you upgrade to Celery 3.2::
    
        CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
    
    You must only enable the serializers that you will actually use.
    
    
      warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
    
     -------------- celery@ansumanb-u12 v3.1.12 (Cipater)
    ---- **** ----- 
    --- * ***  * -- Linux-3.5.0-25-generic-x86_64-with-Ubuntu-12.04-precise
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         proj:0x1f46690
    - ** ---------- .> transport:   amqp://rabbitmquser:**@localhost:5672/localvhost
    - ** ---------- .> results:     disabled
    - *** --- * --- .> concurrency: 4 (prefork)
    -- ******* ---- 
    --- ***** ----- [queues]
     -------------- .> celery           exchange=celery(direct) key=celery
    
    
    [tasks]
      . celery.backend_cleanup
      . celery.chain
      . celery.chord
      . celery.chord_unlock
      . celery.chunks
      . celery.group
      . celery.map
      . celery.starmap
      . proj.tasks.add
      . proj.tasks.mul
      . proj.tasks.xsum
    
    [2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Hub
    [2014-06-12 21:25:02,336: DEBUG/MainProcess] ^-- substep ok
    [2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Pool
    [2014-06-12 21:25:02,344: DEBUG/MainProcess] ^-- substep ok
    [2014-06-12 21:25:02,345: DEBUG/MainProcess] | Worker: Starting Consumer
    [2014-06-12 21:25:02,345: DEBUG/MainProcess] | Consumer: Starting Connection
    

    After running the worker I am opening the terminal and from python interpreter and executing following:

    >>> from proj.tasks import add
    >>> add(2,2)
    4
    >>> add.delay(2,3)
    

    Here the delay hangs (same story for apply_async). When I am stopping it by Ctrl+C I am getting following:

    ^CTraceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 453, in delay
        return self.apply_async(args, kwargs)
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
        **dict(self._get_exec_options(), **options)
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/base.py", line 352, in send_task
        reply_to=reply_to or self.oid, **options
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task
        **kwargs
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 168, in publish
        routing_key, mandatory, immediate, exchange, declare)
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured
        return fun(*args, **kwargs)
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 173, in _publish
        channel = self.channel
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 190, in _get_channel
        channel = self._channel = channel()
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
        value = self.__value__ = self.__contract__()
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 205, in <lambda>
        channel = ChannelPromise(lambda: connection.default_channel)
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel
        self.connection
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection
        self._connection = self._establish_connection()
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection
        conn = self.transport.establish_connection()
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
        conn = self.Connection(**opts)
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 171, in __init__
        (10, 10),  # start
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
        self.channel_id, allowed_methods)
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 237, in _wait_method
        self.method_reader.read_method()
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
        self._next_method()
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method
        frame_type, channel, payload = read_frame()
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 153, in read_frame
        frame_type, channel, size = unpack('>BHI', read(7, True))
      File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 272, in _read
        s = recv(n - len(rbuf))
    KeyboardInterrupt
    

    Any suggestion or comment will be much appreciated. I have gone through other links where they talk about /var directory size but I think I have enough space.

    Result of df -h

    Filesystem      Size  Used Avail Use% Mounted on
    /dev/sda3       283G   99G  170G  37% /
    udev            1.9G  4.0K  1.9G   1% /dev
    tmpfs           388M  1.1M  387M   1% /run
    none            5.0M     0  5.0M   0% /run/lock
    none            1.9G   28M  1.9G   2% /run/shm
    

    following is the result of rabbitmqctl status

    Status of node 'rabbit@ansumanb-u12' ...
    [{pid,12014},
     {running_applications,[{rabbit,"RabbitMQ","3.3.2"},
                            {os_mon,"CPO  CXC 138 46","2.2.7"},
                            {xmerl,"XML parser","1.2.10"},
                            {mnesia,"MNESIA  CXC 138 12","4.5"},
                            {sasl,"SASL  CXC 138 11","2.1.10"},
                            {stdlib,"ERTS  CXC 138 10","1.17.5"},
                            {kernel,"ERTS  CXC 138 10","2.14.5"}]},
     {os,{unix,linux}},
     {erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:4:4] [rq:4] [async-threads:30] [kernel-poll:true]\n"},
     {memory,[{total,27919080},
              {connection_procs,2704},
              {queue_procs,5408},
              {plugins,0},
              {other_proc,9099992},
              {mnesia,63776},
              {mgmt_db,0},
              {msg_index,34080},
              {other_ets,784160},
              {binary,12144},
              {code,14685283},
              {atom,1367393},
              {other_system,1864140}]},
     {alarms,[]},
     {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
     {vm_memory_high_watermark,0.4},
     {vm_memory_limit,1625165004},
     {disk_free_limit,50000000},
     {disk_free,181684699136},
     {file_descriptors,[{total_limit,2},
                        {total_used,0},
                        {sockets_limit,0},
                        {sockets_used,0}]},
     {processes,[{limit,1048576},{used,127}]},
     {run_queue,0},
     {uptime,20072}]
    ...done.
    

    I've checked the rabbitmq logs and didn't get anything there. Celery version is 3.1.12.

    I have created rabbitmq virtual host and user with following commands

    $ sudo rabbitmqctl add_user rabbitmquser <mypassword>
    $ sudo rabbitmqctl add_vhost localvhost
    $ sudo rabbitmqctl set_permissions -p localvhost rabbitmquser ".*" ".*" ".*"
    

    Thanks

  • Zarathustra
    Zarathustra almost 10 years
    in the install instructions somewhere is said that you should set it to unlimited, I think its the default in Debian systems anyway since squeeze.