Dart Stream firstWhere does not immediately resolve

223

Two things are going wrong here.

  1. First of all, in your example:

    The use of sleep here is not appropriate. Consider the documentation for this function:

    Use this with care, as no asynchronous operations can be processed in an isolate while it is blocked in a sleep call.

    Your countStream function, despite being asynchronous, blocks the entire isolate when it sleeps.

    Try this instead:

    await Future<void>.delayed(const Duration(seconds: 10));
    
  2. Now, on to the real reason why firstWhere is not immediately resolving:

    Let's try a simple experiment:

    Stream<int> testStream() async* {
      for (var i = 0; i < 100; ++i) {
        print(i);
        yield i;
      }
    }
    
    void main() {
      late final StreamSubscription streamSubscription;
      streamSubscription = testStream().listen((value) {
        if (value == 1) streamSubscription.cancel();
      }); 
    }
    

    Output:

    0
    1
    2
    

    So what's going on here? The subscription is cancelled at i == 1 - why does the loop continue until 2?

    The answer is simple. The async generator function will not stop (and the stream will therefore not close) until another yield statement is reached. This is due to the way the event loop works:

    Once a Dart function starts executing, it continues executing until it exits. In other words, Dart functions can’t be interrupted by other Dart code.

    The function has no opportunity to stop until it yields again, because cancelling the stream subscription cannot stop it immediately.

    firstWhere uses the internal Dart _cancelAndValue function to complete with a value. It does not complete until the stream is closed, and the stream does not close until the next yield is reached - which, in your cases, may be delayed or never even happen.

    The only way to fix this behaviour while using an asynchronous generator function would be to add another yield or a return statement before the next delay.

Share:
223
altair
Author by

altair

Updated on December 06, 2022

Comments

  • altair
    altair over 1 year

    I have, for example, this simple code:

    //create stream with numbers from 1 to 100, delayed by 10sec duration
    Stream<int> countStream() async* {
      for (int i = 1; i <= 100; i++) {
        yield i;      
        sleep(Duration(seconds: 10));
      }
    }
    
    void main() async {
      var x = await countStream().firstWhere((element) => element == 1); //here Im waiting for number 1
      print(x);
    }
    

    The problem is that firstWhere does not exit right after the yield of 1, but after the yeild of 2, and print is holded for 10 seconds.

    Why? In my real life App, I have websocket stream that is transformed to message stream, and waiting for specific message. But because the websocket stream does not yield another message, firstWhere hangs.

    Here is my original code:

      Stream<Message> lines() async* {
        var partial = '';
    
        await for (String chunk in ws!) { //ws is WebSocket
          var lines = chunk.split('\n');
          lines[0] = partial + lines[0];
          partial = lines.removeLast();
          for (final line in lines) {
            var msg = Message.parse(line); //Message.parse returns CodeMessage object
            if (msg != null) yield msg;
          }
        }
      }
    
    //at some place in code this hangs because last arrived message is CodeMessage
    var msg = await lines().firstWhere((obj) => obj is CodeMessage);
    print(msg);
    
    

    Is there another way to do this or where I am wrong?

  • jamesdlin
    jamesdlin over 2 years
    The sleep call used in the code was an example and isn't a pertinent detail of the question. It could be anything with an observable side effect (e.g. a print() call).
  • hacker1024
    hacker1024 over 2 years
    The question was about the example code; the original code was provided as context. I therefore answered the question being asked. That being said, I was overlooking another cause of the same problem that affects both code samples. My question has been edited to address this.