rxdart BehaviorSubject emits previous events and not only last value

352

Code Spirit

The same behaviour could be faced when messing up with stream concurrency. To illustrate this you can add couple debug lines to your code:

class TestBloc {

  final _controller = BehaviorSubject.seeded([]);

  Stream get stream$ => _controller.stream;

  TestBloc(BehaviorSubject subject) {
    subject.listen((e) {
      print("listen $e");
      print('A: ${DateTime.now()}');
      _controller.add([e]);
      print('B: ${DateTime.now()}');
    });
  }

}

test case:

test("test bloc", () async {
      final subject = BehaviorSubject();

      final bloc = TestBloc(subject);

      subject.add(1);
      subject.add(2);

      // Uncomment next line to make test pass
      // await expectLater(subject.stream, emits(2));

      /// If you uncomment next comment it will fix the problem
      /// this await Future.delayed(Duration.zero) is added
      /// to avoid executing the expectLater() before
      /// the listener in TestBloc constructor.

      // await Future.delayed(Duration.zero);
      print('C: ${DateTime.now()}');
      await expectLater(bloc.stream$, emits([2]));
      print('D: ${DateTime.now()}');
  });

If you run this code you will see:

C: 2022-08-24 23:52:23.910217
listen 1
A: 2022-08-24 23:52:23.941463
B: 2022-08-24 23:52:23.942454
listen 2
A: 2022-08-24 23:52:23.945855
B: 2022-08-24 23:52:23.946078

thus

await expectLater(bloc.stream$, emits([2]));

is executed before

_controller.add([e]);

but when you uncomment

await expectLater(subject.stream, emits(2));

it plays the same role as

await Future.delayed(Duration.zero);
Share:
352
Code Spirit
Author by

Code Spirit

Updated on January 05, 2023

Comments

  • Code Spirit
    Code Spirit over 1 year

    I have a strange issue with rxdart BehaviorSubject. Normally BehaviorSubject should only emit the last value to the stream:

    A special StreamController that captures the latest item that has been added to the controller, and emits that as the first item to any new listener.

    But in my case it is emitting all the previous values on the stream too and I dont know why.

    I have the following test bloc:

    class TestBloc {
    
      final _controller = BehaviorSubject.seeded([]);
    
      Stream get stream$ => _controller.stream;
    
      TestBloc(BehaviorSubject subject) {
        subject.listen((e) {
          print("listen $e");
          _controller.add([e]);
        });
      }
    
    }
    

    With test case:

        test("test bloc", () async {
          final subject = BehaviorSubject();
    
          final bloc = TestBloc(subject);
    
          subject.add(1);
          subject.add(2);
    
          // Uncomment next line to make test pass
          // await expectLater(subject.stream, emits(2)); 
    
          // Test will fail here
          await expectLater(bloc.stream$, emits([2]));
        });
    
    

    The test fails because all the events from the subject where emitted and not only the expected last:

    Expected: should emit an event that [2]
      Actual: <Instance of 'BehaviorSubject<List<dynamic>>'>
       Which: emitted * []
                      * [1]
                      * [2]
                which emitted an event that at location [0] is [] which shorter than expected
    

    Is this an issue with rxdart or am I doing something wrong. To my understanding the .add() call in the listener should have the same effect as calling add consecutive like in the test case but it seems not to.

  • Code Spirit
    Code Spirit almost 2 years
    But calling add should only result in the last event being emitted. Take a look at unit test for BehaviorSubject github.com/ReactiveX/rxdart/blob/master/test/subject/… Also why is the commented line causing the test to pass?