rx dart combine multiple streams to emit value whenever any of the streams emit a value

391

Because all of the solutions here are workarounds Ive implemented my own stream class. Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting:


import 'dart:async';

import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/subscription.dart';

class CombineAnyLatestStream<T, R> extends StreamView<R> {

  CombineAnyLatestStream(List<Stream<T>> streams, R Function(List<T?>) combiner) : super(_buildController(streams, combiner).stream);

  static StreamController<R> _buildController<T, R>(
    Iterable<Stream<T>> streams,
    R Function(List<T?> values) combiner,
  ) {

    int completed = 0;

    late List<StreamSubscription<T>> subscriptions;
    List<T?>? values;

    final _controller = StreamController<R>(sync: true);

    _controller.onListen = () {

      void onDone() {
        if (++completed == streams.length) {
          _controller.close();
        }
      }

      subscriptions = streams.mapIndexed((index, stream) {

        return stream.listen(
          (T event) {
            final R combined;

            if (values == null) return;

            values![index] = event;

            try {
              combined = combiner(List<T?>.unmodifiable(values!));
            } catch (e, s) {
              _controller.addError(e, s);
              return;
            }

            _controller.add(combined);
          },
          onError: _controller.addError,
          onDone: onDone
        );
      }).toList(growable: false);

      if (subscriptions.isEmpty) {
        _controller.close();
      } else {
        values = List<T?>.filled(subscriptions.length, null);
      }
    };

    _controller.onPause = () => subscriptions.pauseAll();
    _controller.onResume = () => subscriptions.resumeAll();
    _controller.onCancel = () {
      values = null;
      return subscriptions.cancelAll();
    };

    return _controller;
  }

}
Share:
391
Code Spirit
Author by

Code Spirit

Updated on January 02, 2023

Comments

  • Code Spirit
    Code Spirit over 1 year

    In RX dart there is the RX.combineLatest method to combine the results of a stream using a callback function. Problem is that it only emits a value when every stream has emitted a value. If one has not it does not emit.

    Merges the given Streams into a single Stream sequence by using the combiner function whenever any of the stream sequences emits an item. The Stream will not emit until all streams have emitted at least one item.

    Im trying to combine multiple streams into one stream for validation which should emit false or true when the streams have not emitted or emitted an empty value.

    class FormBloc {
      final BehaviorSubject<bool> _result = BehaviorSubject();
      final BehaviorSubject<String?> _usernameController = BehaviorSubject();
      final BehaviorSubject<String?> _emailController = BehaviorSubject();
    
      // Will only emit if each stream emitted a value
      // If only username is emitted valid is not emitted
      Stream<bool> get valid$ => Rx.combineLatest2(
        _usernameController.stream, 
        _emailController.stream, 
        (username, email) => username != null || email != null
      );
    
    }
    
    

    How can I join those streams so valid$ emits a value if any of the streams change?