How to wait for coroutines to complete synchronously within method if event loop is already running?

10,185

Solution 1

You could use a set of tasks:

self._send_request_tasks = set()

Schedule the tasks using ensure_future and clean up using add_done_callback:

def send_request(self, request: str) -> None:
    task = asyncio.ensure_future(self._ws.send(request))
    self._send_request_tasks.add(task)
    task.add_done_callback(self._send_request_tasks.remove)

And wait for the set of tasks to complete:

async def stop(self):
    if self._send_request_tasks:
        await asyncio.wait(self._send_request_tasks)

Solution 2

Given that you're not inside an asynchronous function you can use the yield from keyword to effectively implement await yourself. The following code will block until the future returns:

def send_request(self, request: str) -> None:
    logger.debug('Sending request: {}'.format(request))
    future = asyncio.ensure_future(self._ws.send(request))
    yield from future.__await__()
Share:
10,185
Tagc
Author by

Tagc

There are two types of programmers: Those that rely on Stack Overflow each and every day. Liars.

Updated on June 09, 2022

Comments

  • Tagc
    Tagc almost 2 years

    I'm trying to create a Python-based CLI that communicates with a web service via websockets. One issue that I'm encountering is that requests made by the CLI to the web service intermittently fail to get processed. Looking at the logs from the web service, I can see that the problem is caused by the fact that frequently these requests are being made at the same time (or even after) the socket has closed:

    2016-09-13 13:28:10,930 [22 ] INFO  DeviceBridge - Device bridge has opened
    2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
    2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
    2016-09-13 13:28:11,937 [21 ] WARN  DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
    2016-09-13 13:28:11,936 [5  ] DEBUG DeviceBridge - Device bridge has closed
    

    In my CLI I define a class CommunicationService that is responsible for handling all direct communication with the web service. Internally, it uses the websockets package to handle communication, which itself is built on top of asyncio.

    CommunicationService contains the following method for sending requests:

    def send_request(self, request: str) -> None:
        logger.debug('Sending request: {}'.format(request))
        asyncio.ensure_future(self._ws.send(request))
    

    ...where ws is a websocket opened earlier in another method:

    self._ws = await websockets.connect(websocket_address)
    

    What I want is to be able to await the future returned by asyncio.ensure_future and, if necessary, sleep for a short while after in order to give the web service time to process the request before the websocket is closed.

    However, since send_request is a synchronous method, it can't simply await these futures. Making it asynchronous would be pointless as there would be nothing to await the coroutine object it returned. I also can't use loop.run_until_complete as the loop is already running by the time it is invoked.

    I found someone describing a problem very similar to the one I have at mail.python.org. The solution that was posted in that thread was to make the function return the coroutine object in the case the loop was already running:

    def aio_map(coro, iterable, loop=None):
        if loop is None:
            loop = asyncio.get_event_loop()
    
        coroutines = map(coro, iterable)
        coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
    
        if loop.is_running():
            return coros
        else:
            return loop.run_until_complete(coros)
    

    This is not possible for me, as I'm working with PyRx (Python implementation of the reactive framework) and send_request is only called as a subscriber of an Rx observable, which means the return value gets discarded and is not available to my code:

    class AnonymousObserver(ObserverBase):
        ...
        def _on_next_core(self, value):
            self._next(value)
    

    On a side note, I'm not sure if this is some sort of problem with asyncio that's commonly come across or whether I'm just not getting it, but I'm finding it pretty frustrating to use. In C# (for instance), all I would need to do is probably something like the following:

    void SendRequest(string request)
    {
        this.ws.Send(request).Wait();
        // Task.Delay(500).Wait();  // Uncomment If necessary
    }
    

    Meanwhile, asyncio's version of "wait" unhelpfully just returns another coroutine that I'm forced to discard.

    Update

    I've found a way around this issue that seems to work. I have an asynchronous callback that gets executed after the command has executed and before the CLI terminates, so I just changed it from this...

    async def after_command():
        await comms.stop()
    

    ...to this:

    async def after_command():
        await asyncio.sleep(0.25)  # Allow time for communication
        await comms.stop()
    

    I'd still be happy to receive any answers to this problem for future reference, though. I might not be able to rely on workarounds like this in other situations, and I still think it would be better practice to have the delay executed inside send_request so that clients of CommunicationService do not have to concern themselves with timing issues.

    In regards to Vincent's question:

    Does your loop run in a different thread, or is send_request called by some callback?

    Everything runs in the same thread - it's called by a callback. What happens is that I define all my commands to use asynchronous callbacks, and when executed some of them will try to send a request to the web service. Since they're asynchronous, they don't do this until they're executed via a call to loop.run_until_complete at the top level of the CLI - which means the loop is running by the time they're mid-way through execution and making this request (via an indirect call to send_request).

    Update 2

    Here's a solution based on Vincent's proposal of adding a "done" callback.

    A new boolean field _busy is added to CommunicationService to represent if comms activity is occurring or not.

    CommunicationService.send_request is modified to set _busy true before sending the request, and then provides a callback to _ws.send to reset _busy once done:

    def send_request(self, request: str) -> None:
        logger.debug('Sending request: {}'.format(request))
    
        def callback(_):
            self._busy = False
    
        self._busy = True
        asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
    

    CommunicationService.stop is now implemented to wait for this flag to be set false before progressing:

    async def stop(self) -> None:
        """
        Terminate communications with TestCube Web Service.
        """
        if self._listen_task is None or self._ws is None:
            return
    
        # Wait for comms activity to stop.
        while self._busy:
            await asyncio.sleep(0.1)
    
        # Allow short delay after final request is processed.
        await asyncio.sleep(0.1)
    
        self._listen_task.cancel()
        await asyncio.wait([self._listen_task, self._ws.close()])
    
        self._listen_task = None
        self._ws = None
        logger.info('Terminated connection to TestCube Web Service')
    

    This seems to work too, and at least this way all communication timing logic is encapsulated within the CommunicationService class as it should be.

    Update 3

    Nicer solution based on Vincent's proposal.

    Instead of self._busy we have self._send_request_tasks = [].

    New send_request implementation:

    def send_request(self, request: str) -> None:
        logger.debug('Sending request: {}'.format(request))
    
        task = asyncio.ensure_future(self._ws.send(request))
        self._send_request_tasks.append(task)
    

    New stop implementation:

    async def stop(self) -> None:
        if self._listen_task is None or self._ws is None:
            return
    
        # Wait for comms activity to stop.
        if self._send_request_tasks:
            await asyncio.wait(self._send_request_tasks)
        ...