Python - how to run multiple coroutines concurrently using asyncio?

33,461

Solution 1

TL;DR Use asyncio.ensure_future() to run several coroutines concurrently.


Maybe this scenario requires a framework based on events/callbacks rather than one based on coroutines? Tornado?

No, you don't need any other framework for this. The whole idea the asynchronous application vs synchronous is that it doesn't block, while waiting for result. It doesn't matter how it is implemented, using coroutines or callbacks.

I mean, because connection_handler is constantly waiting for incoming messages, the server can only take action after it has received a message from the client, right? What am I missing here?

In synchronous application you will write something like msg = websocket.recv(), which would block whole application until you receive message (as you described). But in the asynchronous application it's completely different.

When you do msg = yield from websocket.recv() you say something like: suspend execution of connection_handler() until websocket.recv() will produce something. Using yield from inside coroutine returns control back to the event loop, so some other code can be executed, while we're waiting for result of websocket.recv(). Please, refer to documentation to better understand how coroutines work.

Let's say we – additionally – wanted to send a message to the client whenever some event happens. For simplicity, let's send a message periodically every 60 seconds. How would we do that?

You can use asyncio.async() to run as many coroutines as you want, before executing blocking call for starting event loop.

import asyncio

import websockets

# here we'll store all active connections to use for sending periodic messages
connections = []


@asyncio.coroutine
def connection_handler(connection, path):
    connections.append(connection)  # add connection to pool
    while True:
        msg = yield from connection.recv()
        if msg is None:  # connection lost
            connections.remove(connection)  # remove connection from pool, when client disconnects
            break
        else:
            print('< {}'.format(msg))
        yield from connection.send(msg)
        print('> {}'.format(msg))


@asyncio.coroutine
def send_periodically():
    while True:
        yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds
        for connection in connections:
            print('> Periodic event happened.')
            yield from connection.send('Periodic event happened.')  # send message to each connected client


start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()

Here is an example client implementation. It asks you to enter name, receives it back from the echo server, waits for two more messages from server (which are our periodic messages) and closes connection.

import asyncio

import websockets


@asyncio.coroutine
def hello():
    connection = yield from websockets.connect('ws://localhost:8000/')
    name = input("What's your name? ")
    yield from connection.send(name)
    print("> {}".format(name))
    for _ in range(3):
        msg = yield from connection.recv()
        print("< {}".format(msg))

    yield from connection.close()


asyncio.get_event_loop().run_until_complete(hello())

Important points:

  1. In Python 3.4.4 asyncio.async() was renamed to asyncio.ensure_future().
  2. There are special methods for scheduling delayed calls, but they don't work with coroutines.

Solution 2

I'm surprised gather isn't mentioned.

From the Python documentation:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

Solution 3

Same issue, can hardly got solution until I saw the perfect sample here: http://websockets.readthedocs.io/en/stable/intro.html#both

 done, pending = await asyncio.wait(
        [listener_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED)  # Important

So, I can handle multi coroutine tasks such as heartbeat and redis subscribe.

Share:
33,461
weatherfrog
Author by

weatherfrog

Updated on September 11, 2020

Comments

  • weatherfrog
    weatherfrog over 3 years

    I'm using the websockets library to create a websocket server in Python 3.4. Here's a simple echo server:

    import asyncio
    import websockets
    
    @asyncio.coroutine
    def connection_handler(websocket, path):
        while True:
            msg = yield from websocket.recv()
            if msg is None:  # connection lost
                break
            yield from websocket.send(msg)
    
    start_server = websockets.serve(connection_handler, 'localhost', 8000)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
    

    Let's say we – additionally – wanted to send a message to the client whenever some event happens. For simplicity, let's send a message periodically every 60 seconds. How would we do that? I mean, because connection_handler is constantly waiting for incoming messages, the server can only take action after it has received a message from the client, right? What am I missing here?

    Maybe this scenario requires a framework based on events/callbacks rather than one based on coroutines? Tornado?

  • weatherfrog
    weatherfrog over 8 years
    Great answer, thank you! I understand what coroutines are, but I'm still trying to get my head around the asyncio framework. Your answer helped a lot.
  • verystrongjoe
    verystrongjoe about 6 years
    @weatherfrog Did you solve the problem? I have a similar issue over here
  • verystrongjoe
    verystrongjoe about 6 years
    I tried the link you taught. But I can not solve my problem. Please give some advice my issue over here
  • Mark Ellul
    Mark Ellul about 4 years
    thanks for this... Even though my problem wasn't the same, this solution helped lots
  • wahyudinata
    wahyudinata almost 4 years
    this is the simplest solution that works among all the proposed in this thread
  • Atem18
    Atem18 over 3 years
    Can confirm on Python 3.8, simple and easy, it just works.
  • midtownguru
    midtownguru over 2 years
    This answer does not have anything to do with the websockets part. It's just an asyncio example
  • Benyamin Jafari - aGn
    Benyamin Jafari - aGn about 2 years
    Nice answer. +1. It's also worth mentioning that asyncio.run() works on python 3.7 and later.