Is non-blocking Redis pubsub possible?

38,530

Solution 1

I don't think that would be possible. A Channel doesn't have any "current data", you subscribe to a channel and start receiving messages that are being pushed by other clients on the channel, hence it is a blocking API. Also if you look at the Redis Commands documentation for pub/sub it would make it more clear.

Solution 2

If you're thinking of non-blocking, asynchronous processing, you're probably using (or should use) asynchronous framework/server.

UPDATE: It's been 5 years since the original answer, in the mean time Python got native async IO support. There now is AIORedis, an async IO Redis client.

Solution 3

Accepted answer is obsolete as redis-py recommends you to use the non-blocking get_message(). But it also provides a way to easily use threads.

https://pypi.python.org/pypi/redis

There are three different strategies for reading messages.

Behind the scenes, get_message() uses the system’s ‘select’ module to quickly poll the connection’s socket. If there’s data available to be read, get_message() will read it, format the message and return it or pass it to a message handler. If there’s no data to be read, get_message() will immediately return None. This makes it trivial to integrate into an existing event loop inside your application.

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

Older versions of redis-py only read messages with pubsub.listen(). listen() is a generator that blocks until a message is available. If your application doesn’t need to do anything else but receive and act on messages received from redis, listen() is an easy way to get up an running.

 for message in p.listen():
     # do something with the message

The third option runs an event loop in a separate thread. pubsub.run_in_thread() creates a new thread and starts the event loop. The thread object is returned to the caller of run_in_thread(). The caller can use the thread.stop() method to shut down the event loop and thread. Behind the scenes, this is simply a wrapper around get_message() that runs in a separate thread, essentially creating a tiny non-blocking event loop for you. run_in_thread() takes an optional sleep_time argument. If specified, the event loop will call time.sleep() with the value in each iteration of the loop.

Note: Since we’re running in a separate thread, there’s no way to handle messages that aren’t automatically handled with registered message handlers. Therefore, redis-py prevents you from calling run_in_thread() if you’re subscribed to patterns or channels that don’t have message handlers attached.

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()

So to answer you question, just check get_message when you want to know if a message has arrived.

Solution 4

The new version of redis-py has support for asynchronous pubsub, check https://github.com/andymccurdy/redis-py for more details. Here's an example from the documentation itself:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)

Solution 5

This is a working example to thread the blocking listener.

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()
Share:
38,530
limboy
Author by

limboy

Updated on November 02, 2021

Comments

  • limboy
    limboy over 2 years

    I want to use redis' pubsub to transmit some messages, but don't want be blocked using listen, like the code below:

    import redis
    rc = redis.Redis()
    
    ps = rc.pubsub()
    ps.subscribe(['foo', 'bar'])
    
    rc.publish('foo', 'hello world')
    
    for item in ps.listen():
        if item['type'] == 'message':
            print item['channel']
            print item['data']
    

    The last for section will block. I just want to check if a given channel has data, how can I accomplish this? Is there a check like method?