How to create a StreamTransformer in Dart?
Solution 1
Okay. Here's another working example:
import 'dart:async';
class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
StreamController _controller;
StreamSubscription _subscription;
bool cancelOnError;
// Original Stream
Stream<S> _stream;
DuplicateTransformer({bool sync: false, this.cancelOnError}) {
_controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
_subscription.pause();
}, onResume: () {
_subscription.resume();
}, sync: sync);
}
DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
_controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
}
void _onListen() {
_subscription = _stream.listen(onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: cancelOnError);
}
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
/**
* Transformation
*/
void onData(S data) {
_controller.add(data);
_controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
}
/**
* Bind
*/
Stream<T> bind(Stream<S> stream) {
this._stream = stream;
return _controller.stream;
}
}
void main() {
// Create StreamController
StreamController controller = new StreamController.broadcast();
// Transform
Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());
s.listen((data) {
print('data: $data');
}).cancel();
s.listen((data) {
print('data2: $data');
}).cancel();
s.listen((data) {
print('data3: $data');
});
// Simulate data
controller.add(1);
controller.add(2);
controller.add(3);
}
Let me add some notes:
- Using
implements
seems to be the right way here when looking at the source code of other dart internal transformers. - I implemented both versions for regular and a broadcast stream.
- In case of a regular stream you can call cancel/pause/resumt directly on the new stream controller because we can only listen once.
- If you use a broadcast stream I found out that listen() is only called if there is no one listening already to the stream. onCancel behaves the same. If the last subscriber cancels its subscription, then onCancel is called. That's why it is safe to use the same functions here.
Solution 2
Why don't you use StreamTransformer.fromHandler()
:
import 'dart:async';
void handleData(data, EventSink sink) {
sink.add(data*2);
}
void main() {
StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);
StreamController controller = new StreamController();
controller.stream.transform(doubleTransformer).listen((data) {
print('data: $data');
});
controller.add(1);
controller.add(2);
controller.add(3);
}
Output:
data: 2
data: 4
data: 6
Solution 3
https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139
You can use StreamTransformer.fromHandlers to easily create transformers that just convert input events to output events.
Example:
new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) { if (event.startsWith('data:')) { output.add(JSON.decode(event.substring('data:'.length))); } else if (event.isNotEmpty) { output.addError('Unexpected data from CloudBit stream: "$event"'); } });
Solution 4
Unlike map
, transformers are more powerful and allows you to maintain an internal state, and emit a value whenever you want. It can achieve things map
can't do, such as delaying, duplicating values, selectively omitting some values, and etc.
Essentially, the implementation requires a bind
method that provides a new stream based on an old stream being passed in, and a cast
method that helps with type-checking during run-time.
Here's an over-simplified example of implementing a "TallyTransformer" that transforms a stream of integer values into a stream of sums. For example, if the input stream so far had 1, 1, 1, -2, 0, ...
, the output stream would've been 1, 2, 3, 1, 1, ...
, i.e. summing all inputs up to this point.
Example usage: stream.transform(TallyTransformer())
class TallyTransformer implements StreamTransformer {
StreamController _controller = StreamController();
int _sum = 0; // sum of all values so far
@override
Stream bind(Stream stream) {
// start listening on input stream
stream.listen((value) {
_sum += value; // add the new value to sum
_controller.add(_sum); // emit current sum to our listener
});
// return an output stream for our listener
return _controller.stream;
}
@override
StreamTransformer<RS, RT> cast<RS, RT>() {
return StreamTransformer.castFrom(this);
}
}
This example is over-simplified (but still works) and does not cover cases such as stream pausing, resuming or canceling. If you run into "Stream has already been listened"
error, make sure streams are broadcasting.
Related videos on Youtube
Will Squire
Hi, I'm Will Squire and I'm a Full Stack Developer who loves to build software for the web. For the non-techie this means I love specialising in every aspect of web development and I'm flexible enough to take complete ownership of development when required or specialise in one particular area of development when working in a team. For the more techie, I build and configure server stacks, DevOps infrastructures, backend apps, frontend apps and production pipelines. I use tech like Wordpress, Drupal, Polymer, Vagrant and Ansible on the daily (to name a few!). Having studied a degree that encompassed the entire production of software, I’ve also worked on graphics and UX design in the past and have a broad range of experience in both design/development of games, apps and websites, providing foundation for graphics design and 3D modelling, as well as programming. I believe technical knowledge and problem solving skills are results of working with a ‘can do’ attitude to solving problems, and thus I make this my mantra. Precise time management skills have been gained from experience in managing projects and deadlines whilst providing the quality consistently. I’m a team oriented individual that believes great communication and research, means great project results.
Updated on October 20, 2021Comments
-
Will Squire over 2 years
Trying to build a custom StreamTransformer class, however a lot of the examples out there seem to be out of date, and the one found in the documentation isn't (what some typed languages might consider anyway) as a class (found here: https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.StreamTransformer). This doesn't seem like a very Dart-like way of approaching it and rather more of a Javascript-like way (which I'm using Dart to avoid).
Many online sources say this is how you create a StreamTransformer, however there errors when extending it.
class exampleStreamTransformer extends StreamTransformer { //... (This won't work) }
'Implements' seems to be the way to go, along with implementing the bind function needed:
class exampleStreamTransformer implements StreamTransformer { Stream bind(Stream stream) { //... (Go on to return new stream, etc) } }
I can't seem to find any examples of this way, but have thrown something together myself (which is accepted in my IDE, but isn't accepted at runtime, I get a null object error when it tries to use pause getter):
class exampleStreamTransformer implements StreamTransformer { StreamController<String> _controller; StreamSubscription<String> _subscription; Stream bind(Stream stream) { _controller = new StreamController<String>( onListen: () { _subscription = stream.listen((data) { // Transform the data. _controller.add(data); }, onError: _controller.addError, onDone: _controller.close, cancelOnError: true); // Unsure how I'd pass this in????? }, onPause: _subscription.pause, onResume: _subscription.resume, onCancel: _subscription.cancel, sync: true ); return _controller.stream; } }
Would like to achieve it this way, as in the 'typed' way of producing the class, any help is much appreciated, thank you.
-
lrn over 9 yearsI agree that
StreamTransformer
is an unnecessary class - it contains only one function, so you could just pass that function anywhere you pass the object. If it was added to the library now, it would just be a function type.
-
-
Will Squire over 9 yearsBecause I need a class. Content needs to be buffered and such, it is quite complicated. Thank you anyway.
-
Robert over 9 yearsThen make a global buffer variable that buffers data and write data to the sink as needed?
-
Will Squire over 9 yearsNo, that's an inherently bad idea. What if I want to use this more than once? Having a global buffer would mean two streams could potentially buffer to the same place, else each time I'd have to create a separate variable somewhere outside the scope of the function. Lacks encapsulation and code maintainability. Just want to call .transform(new exampleStreamTransformer()); + even that could be shortened, but I'm rather stuck to the language conventions in this respect. Here is an old (deprecated) example: victorsavkin.com/post/51233496661/…
-
Will Squire over 9 yearsYes, this looks like a much better implementation! Thank you, I'll try out later
-
Daniel Leiszen almost 4 yearsThanks for the implementation, this is very helpful :)