rx dart combine multiple streams to emit value whenever any of the streams emit a value
Because all of the solutions here are workarounds Ive implemented my own stream class. Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting:
import 'dart:async';
import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/subscription.dart';
class CombineAnyLatestStream<T, R> extends StreamView<R> {
CombineAnyLatestStream(List<Stream<T>> streams, R Function(List<T?>) combiner) : super(_buildController(streams, combiner).stream);
static StreamController<R> _buildController<T, R>(
Iterable<Stream<T>> streams,
R Function(List<T?> values) combiner,
) {
int completed = 0;
late List<StreamSubscription<T>> subscriptions;
List<T?>? values;
final _controller = StreamController<R>(sync: true);
_controller.onListen = () {
void onDone() {
if (++completed == streams.length) {
_controller.close();
}
}
subscriptions = streams.mapIndexed((index, stream) {
return stream.listen(
(T event) {
final R combined;
if (values == null) return;
values![index] = event;
try {
combined = combiner(List<T?>.unmodifiable(values!));
} catch (e, s) {
_controller.addError(e, s);
return;
}
_controller.add(combined);
},
onError: _controller.addError,
onDone: onDone
);
}).toList(growable: false);
if (subscriptions.isEmpty) {
_controller.close();
} else {
values = List<T?>.filled(subscriptions.length, null);
}
};
_controller.onPause = () => subscriptions.pauseAll();
_controller.onResume = () => subscriptions.resumeAll();
_controller.onCancel = () {
values = null;
return subscriptions.cancelAll();
};
return _controller;
}
}
Code Spirit
Updated on January 02, 2023Comments
-
Code Spirit over 1 year
In RX dart there is the
RX.combineLatest
method to combine the results of a stream using a callback function. Problem is that it only emits a value when every stream has emitted a value. If one has not it does not emit.Merges the given Streams into a single Stream sequence by using the combiner function whenever any of the stream sequences emits an item. The Stream will not emit until all streams have emitted at least one item.
Im trying to combine multiple streams into one stream for validation which should emit false or true when the streams have not emitted or emitted an empty value.
class FormBloc { final BehaviorSubject<bool> _result = BehaviorSubject(); final BehaviorSubject<String?> _usernameController = BehaviorSubject(); final BehaviorSubject<String?> _emailController = BehaviorSubject(); // Will only emit if each stream emitted a value // If only username is emitted valid is not emitted Stream<bool> get valid$ => Rx.combineLatest2( _usernameController.stream, _emailController.stream, (username, email) => username != null || email != null ); }
How can I join those streams so
valid$
emits a value if any of the streams change?