How to create a StreamTransformer in Dart?

11,633

Solution 1

Okay. Here's another working example:

import 'dart:async';

class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOnError;

  // Original Stream
  Stream<S> _stream;

  DuplicateTransformer({bool sync: false, this.cancelOnError}) {
    _controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
      _subscription.pause();
    }, onResume: () {
      _subscription.resume();
    }, sync: sync);
  }

  DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
    _controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    _subscription = _stream.listen(onData,
      onError: _controller.addError,
      onDone: _controller.close,
      cancelOnError: cancelOnError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  /**
   * Transformation
   */

  void onData(S data) {
    _controller.add(data);
    _controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
  }

  /**
   * Bind
   */

  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }
}

void main() {
  // Create StreamController
  StreamController controller = new StreamController.broadcast();
  // Transform
  Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());

  s.listen((data) {
    print('data: $data');
  }).cancel();

  s.listen((data) {
    print('data2: $data');
  }).cancel();

  s.listen((data) {
    print('data3: $data');
  });

  // Simulate data

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

Let me add some notes:

  • Using implements seems to be the right way here when looking at the source code of other dart internal transformers.
  • I implemented both versions for regular and a broadcast stream.
  • In case of a regular stream you can call cancel/pause/resumt directly on the new stream controller because we can only listen once.
  • If you use a broadcast stream I found out that listen() is only called if there is no one listening already to the stream. onCancel behaves the same. If the last subscriber cancels its subscription, then onCancel is called. That's why it is safe to use the same functions here.

Solution 2

Why don't you use StreamTransformer.fromHandler():

import 'dart:async';

void handleData(data, EventSink sink) {
  sink.add(data*2);
}

void main() {
  StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);

  StreamController controller = new StreamController();
  controller.stream.transform(doubleTransformer).listen((data) {
    print('data: $data');
  });

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

Output:

data: 2
data: 4
data: 6

Solution 3

https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139

You can use StreamTransformer.fromHandlers to easily create transformers that just convert input events to output events.

Example:

new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) {
  if (event.startsWith('data:')) {
    output.add(JSON.decode(event.substring('data:'.length)));
  } else if (event.isNotEmpty) {
    output.addError('Unexpected data from CloudBit stream: "$event"');
  }
});

Solution 4

Unlike map, transformers are more powerful and allows you to maintain an internal state, and emit a value whenever you want. It can achieve things map can't do, such as delaying, duplicating values, selectively omitting some values, and etc.

Essentially, the implementation requires a bind method that provides a new stream based on an old stream being passed in, and a cast method that helps with type-checking during run-time.

Here's an over-simplified example of implementing a "TallyTransformer" that transforms a stream of integer values into a stream of sums. For example, if the input stream so far had 1, 1, 1, -2, 0, ..., the output stream would've been 1, 2, 3, 1, 1, ..., i.e. summing all inputs up to this point.

Example usage: stream.transform(TallyTransformer())

class TallyTransformer implements StreamTransformer {
  StreamController _controller = StreamController();
  int _sum = 0; // sum of all values so far

  @override
  Stream bind(Stream stream) {
    // start listening on input stream
    stream.listen((value) {
      _sum += value; // add the new value to sum
      _controller.add(_sum); // emit current sum to our listener
    });
    // return an output stream for our listener
    return _controller.stream;
  }

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() {
    return StreamTransformer.castFrom(this);
  }
}

This example is over-simplified (but still works) and does not cover cases such as stream pausing, resuming or canceling. If you run into "Stream has already been listened" error, make sure streams are broadcasting.

Share:
11,633

Related videos on Youtube

Will Squire
Author by

Will Squire

Hi, I'm Will Squire and I'm a Full Stack Developer who loves to build software for the web. For the non-techie this means I love specialising in every aspect of web development and I'm flexible enough to take complete ownership of development when required or specialise in one particular area of development when working in a team. For the more techie, I build and configure server stacks, DevOps infrastructures, backend apps, frontend apps and production pipelines. I use tech like Wordpress, Drupal, Polymer, Vagrant and Ansible on the daily (to name a few!). Having studied a degree that encompassed the entire production of software, I’ve also worked on graphics and UX design in the past and have a broad range of experience in both design/development of games, apps and websites, providing foundation for graphics design and 3D modelling, as well as programming. I believe technical knowledge and problem solving skills are results of working with a ‘can do’ attitude to solving problems, and thus I make this my mantra. Precise time management skills have been gained from experience in managing projects and deadlines whilst providing the quality consistently. I’m a team oriented individual that believes great communication and research, means great project results.

Updated on October 20, 2021

Comments

  • Will Squire
    Will Squire over 2 years

    Trying to build a custom StreamTransformer class, however a lot of the examples out there seem to be out of date, and the one found in the documentation isn't (what some typed languages might consider anyway) as a class (found here: https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.StreamTransformer). This doesn't seem like a very Dart-like way of approaching it and rather more of a Javascript-like way (which I'm using Dart to avoid).

    Many online sources say this is how you create a StreamTransformer, however there errors when extending it.

    class exampleStreamTransformer extends StreamTransformer
    {
      //... (This won't work)
    }
    

    'Implements' seems to be the way to go, along with implementing the bind function needed:

    class exampleStreamTransformer implements StreamTransformer
    {
      Stream bind(Stream stream)
      {
        //... (Go on to return new stream, etc)
      }
    }
    

    I can't seem to find any examples of this way, but have thrown something together myself (which is accepted in my IDE, but isn't accepted at runtime, I get a null object error when it tries to use pause getter):

    class exampleStreamTransformer implements StreamTransformer
    {
      StreamController<String> _controller;
      StreamSubscription<String> _subscription;
    
      Stream bind(Stream stream)
      {
        _controller = new StreamController<String>(
            onListen: ()
            {
              _subscription = stream.listen((data)
              {
                // Transform the data.
                _controller.add(data);
              },
              onError: _controller.addError,
              onDone: _controller.close,
              cancelOnError: true); // Unsure how I'd pass this in?????
            },
            onPause: _subscription.pause,
            onResume: _subscription.resume,
            onCancel: _subscription.cancel,
            sync: true
        );
    
        return _controller.stream;
      }
    }
    

    Would like to achieve it this way, as in the 'typed' way of producing the class, any help is much appreciated, thank you.

    • lrn
      lrn over 9 years
      I agree that StreamTransformer is an unnecessary class - it contains only one function, so you could just pass that function anywhere you pass the object. If it was added to the library now, it would just be a function type.
  • Will Squire
    Will Squire over 9 years
    Because I need a class. Content needs to be buffered and such, it is quite complicated. Thank you anyway.
  • Robert
    Robert over 9 years
    Then make a global buffer variable that buffers data and write data to the sink as needed?
  • Will Squire
    Will Squire over 9 years
    No, that's an inherently bad idea. What if I want to use this more than once? Having a global buffer would mean two streams could potentially buffer to the same place, else each time I'd have to create a separate variable somewhere outside the scope of the function. Lacks encapsulation and code maintainability. Just want to call .transform(new exampleStreamTransformer()); + even that could be shortened, but I'm rather stuck to the language conventions in this respect. Here is an old (deprecated) example: victorsavkin.com/post/51233496661/…
  • Will Squire
    Will Squire over 9 years
    Yes, this looks like a much better implementation! Thank you, I'll try out later
  • Daniel Leiszen
    Daniel Leiszen almost 4 years
    Thanks for the implementation, this is very helpful :)