How to drop unprocessed events because of slow processing device while listening on a StreamSubscription?

124

My current solution is to rate limit the Stream with RxDart. _primaryMeasurements.value is a Stream<List<int>>. Someone can say something like this:

import 'package:rxdart/rxdart.dart';

// Widget field:
BluetoothCharacteristic _primaryMeasurements;

// later:
await _primaryMeasurements.setNotifyValue(true);
_primaryMeasurements.value.throttleTime(const Duration(milliseconds: 500), leading: false, trailing: true).listen((data) async {
  if (data != null && data.length > 1) {
    await _recordMeasurement(data);
  }
});

This would ensure that at most two data points will be processed per second. The key portion is the .throttleTime(Duration(milliseconds: 500), leading: false, trailing: true).

This throttling sets a fixed rate, so if a device would be quite powerful it would still only receive data by the throttle limited pace. But at least that would give some breath and help with battery consumption, and if the limit is set properly a slower device can still cope with the data as well.


Note 1: I added leading: false, trailing: true to the arguments of throttleTime because after the throttling duration I always want to end up with the latest packet in my hand no matter what.

Note 2: I tried to make this more linted, for example the const Duration.

Note 3: In my most complicated throttling spot I ended up developing my own throttling function (which I'd advise against and would avoid spinning out my own at all costs, but I left with no other choice): I needed to discard certain invalid packets and I would not wanted those packets to count against the throttling timing or throttling logic. But for simpler sensor message pumps I still use the native RxDart throttleTime.

Share:
124
Csaba Toth
Author by

Csaba Toth

Updated on December 25, 2022

Comments

  • Csaba Toth
    Csaba Toth over 1 year

    I have a Flutter app which receives a flow of data stream from a Bluetooth Low Energy Device using the flutter_blue package. I tested my app with an old tablet and one of the source devices ingests data into the stream 3-4 times a second. Apparently the sluggish tablet cannot keep up with the demand. As a result when the stream of information stops my application still displays changing data as it goes through each event in the stream until it reaches the end.

    What I'd like is to drop any event which the device is not able to keep up with. For my application it's not a problem if some frames are dropped. Speaking in networking metaphors: UDP connection is more sufficient for me, I don't need TCP.

    // Widget field:
    BluetoothCharacteristic _primaryMeasurements;
    
    // later:
    await _primaryMeasurements.setNotifyValue(true);
    _primaryMeasurements.value.listen((data) async {
      if (data != null && data.length > 1) {
        await _recordMeasurement(data);
      }
    });
    
    // Even later, this is obviously over simplified essence
    @override
    Widget build(BuildContext context) {
      return Text('${data[3]}'),
    }
    

    I don't see clearly how to drop pieces from the stream in a way that only lose the "piled up" ones, and only in case of the device is "lame".