Dart Stream firstWhere does not immediately resolve
Two things are going wrong here.
-
First of all, in your example:
The use of
sleep
here is not appropriate. Consider the documentation for this function:Use this with care, as no asynchronous operations can be processed in an isolate while it is blocked in a sleep call.
Your
countStream
function, despite being asynchronous, blocks the entire isolate when it sleeps.Try this instead:
await Future<void>.delayed(const Duration(seconds: 10));
-
Now, on to the real reason why
firstWhere
is not immediately resolving:Let's try a simple experiment:
Stream<int> testStream() async* { for (var i = 0; i < 100; ++i) { print(i); yield i; } } void main() { late final StreamSubscription streamSubscription; streamSubscription = testStream().listen((value) { if (value == 1) streamSubscription.cancel(); }); }
Output:
0 1 2
So what's going on here? The subscription is cancelled at
i == 1
- why does the loop continue until 2?The answer is simple. The async generator function will not stop (and the stream will therefore not close) until another yield statement is reached. This is due to the way the event loop works:
Once a Dart function starts executing, it continues executing until it exits. In other words, Dart functions can’t be interrupted by other Dart code.
The function has no opportunity to stop until it yields again, because cancelling the stream subscription cannot stop it immediately.
firstWhere
uses the internal Dart_cancelAndValue
function to complete with a value. It does not complete until the stream is closed, and the stream does not close until the nextyield
is reached - which, in your cases, may be delayed or never even happen.The only way to fix this behaviour while using an asynchronous generator function would be to add another
yield
or areturn
statement before the next delay.
altair
Updated on December 06, 2022Comments
-
altair over 1 year
I have, for example, this simple code:
//create stream with numbers from 1 to 100, delayed by 10sec duration Stream<int> countStream() async* { for (int i = 1; i <= 100; i++) { yield i; sleep(Duration(seconds: 10)); } } void main() async { var x = await countStream().firstWhere((element) => element == 1); //here Im waiting for number 1 print(x); }
The problem is that firstWhere does not exit right after the yield of 1, but after the yeild of 2, and print is holded for 10 seconds.
Why? In my real life App, I have websocket stream that is transformed to message stream, and waiting for specific message. But because the websocket stream does not yield another message, firstWhere hangs.
Here is my original code:
Stream<Message> lines() async* { var partial = ''; await for (String chunk in ws!) { //ws is WebSocket var lines = chunk.split('\n'); lines[0] = partial + lines[0]; partial = lines.removeLast(); for (final line in lines) { var msg = Message.parse(line); //Message.parse returns CodeMessage object if (msg != null) yield msg; } } } //at some place in code this hangs because last arrived message is CodeMessage var msg = await lines().firstWhere((obj) => obj is CodeMessage); print(msg);
Is there another way to do this or where I am wrong?
-
pskink over 2 yearsi would replace your
async*
generator withws!.expand(...)
orws!.transform(...)
- see dart.dev/tutorials/language/streams#modify-stream-methods for more info
-
-
jamesdlin over 2 yearsThe
sleep
call used in the code was an example and isn't a pertinent detail of the question. It could be anything with an observable side effect (e.g. aprint()
call). -
hacker1024 over 2 yearsThe question was about the example code; the original code was provided as context. I therefore answered the question being asked. That being said, I was overlooking another cause of the same problem that affects both code samples. My question has been edited to address this.