How do I get a dart Stream with uneven intervals?

121

I think the easiest is to just emit those messages at the specified intervals. Something like:

Future<void> _wait(int milliseconds) async =>
  await Future<void>.delayed(Duration(milliseconds: milliseconds));

Stream<String> generateMessages() async* {
  yield 'start';
  await _wait(100);
  yield 'init_all';
  await _wait(400);
  yield 'user_pick_x';
  yield 'user_start_x';
  await _wait(3000);
  yield 'interrupt';
}

void main() {
  generateMessages().listen((msg) {
    print('${DateTime.now()}: $msg');
  });
}

which will print:

2021-07-25 10:21:21.429: start
2021-07-25 10:21:21.531: init_all
2021-07-25 10:21:21.934: user_pick_x
2021-07-25 10:21:21.934: user_start_x
2021-07-25 10:21:24.938: interrupt

If you want to make sure that the listener of the stream receives events asynchronously - hence not interfering with the wait milliseconds, you can explicitly use the StreamController which by default calls the listeners asynchronously (make sure to import dart:async --- dart:io is only used in the example for the sleep to show that even on a blocking action it will run in parallel with the waiting):

import 'dart:async';
import 'dart:io';

Future<void> _wait(int milliseconds) async {
  print('WAIT $milliseconds ms');
  await Future<void>.delayed(Duration(milliseconds: milliseconds));
}

Stream<String> generateMessages() {
  final controller = StreamController<String>(sync: false);
  controller.onListen = () async {
    controller.add('start');
    await _wait(100);
    controller.add('init_all');
    await _wait(400);
    controller.add('user_pick_x');
    controller.add('user_start_x');
    await _wait(3000);
    controller.add('interrupt');
  };
  return controller.stream;
}

void main() {
  generateMessages().listen((msg) {
    sleep(const Duration(milliseconds: 120));
    print('${DateTime.now()}: $msg');
  });
}
Share:
121
nvoigt
Author by

nvoigt

Updated on December 31, 2022

Comments

  • nvoigt
    nvoigt 10 months

    I'm new to dart and Flutter and would love to get some advice on an algorithmic problem I'm facing.

    I want to connect my Flutter app to a bluetooth device (that part is done, I am connected already) and send messages on uneven intervals. I have the messages in a list and for each of them I know at what time (milliseconds) I want to send the message.

    So suppose the following messages are lined up:

    • start at 0ms
    • init_app at 100ms
    • user_pick_x at 500ms
    • user_start_x at 500ms (will be sent after user_pick_x, order should be guaranteed)
    • interrupt at 3500ms

    I have found the documentation to create streams, but it always talks about a single interval value. https://dart.dev/articles/libraries/creating-streams.

    Ideas:

    • Technically I can pass in a list of Duration objects and work with a custom generator async*, along with the message string.
    • Alternatively I can set the interval to the lowest time delta and check on each one whether a message / messages should be sent. In the case of the example that would be every 100ms.

    It would be nice to be able to pause / cancel the stream as well. Which is something that streams can do natively.

  • Admin
    Admin over 2 years
    This works great, thanks! My only doubt is whether the time would slowly move ahead of the intended time to send. So the message publish of the stream would take a few ms, causing all following futures and messages to be delayed as well. Is that true, if so, could I solve for that issue?
  • Herbert Poul
    Herbert Poul over 2 years
    @Robin It depends what the listener does. If the listener only triggers itself an asynchronous call, the delay should be negligible. If on the other hand the listener does something blocking (like File().writeAsBytesSync(...)) or some heavy operation, it could add delay. I have added an example using an explicit StreamController which will make ensure that the listener is called asynchronously. Although note that it's still everything essentially single-threaded. So if your listener takes >100ms of synchronous work, it could just as well be that init_all will be called AFTER 100ms.
  • Herbert Poul
    Herbert Poul over 2 years
    ie. if you do some heavy encryption for 200ms in your main isolate, nothing else will get done, and even with an asynchronous StreamController the wait will not finish after 100ms - because in an isolate there can always only be one synchronous work. If this is the case, that you have to do long blocking work, you might consider spawning a separate isolate.
  • Admin
    Admin over 2 years
    Yep that looks perfect! As I said the application is a Bluetooth device. Given there will be back and forth communication a separate isolate sounds like a good idea. That will just act as the interface to my device.