RxJS: Correct way to manually emit an Observable


Of course you can combine Observables and Subjects into one stream.

I think the question here is what makes more sense in you usecase. From your description when implementing something like "add item" functionality I'd prefer Subject over Observable.create.

This is because every time you subscribe to your signal you're reassigning this.additem. The callback to Observable.create is called for every observer. Note that more correct usage of Observable.create would look like this:

const signal = Observable.create((observer) => {
   this.additem = (item) => observer.next(item);
   return () => this.additem = null;

The returned callback () => this.additem = null is called when you unsubscribe from this Observable and that's the place where you should handle all cleanup.

However, if you make two subscriptions to signal then you'll override this.additem twice and then if you chose to unsubscribe one of the observers you would this.additem = null and it would probably lead to an unexpected behavior.

So in this case it makes more sense to use Subject. For example like this:

const subject = new Subject();
this.additem = (item) => subject.next(item);

If you want to see more real life example of Observable.create have a look at for example this: Subscribe to a stream with RxJS and twitter-stream-api module

Edit: Also have a look at these articles from the lead developer of RxJS 5:


Related videos on Youtube

Author by


Updated on June 04, 2022


  • Potatoes
    Potatoes over 1 year

    Working with RxJS in Angular 4.x, I'm seeing two very different patterns for generating Observables from streams of user initiated actions. One stream is the direct result of a user clicking an 'add item' button that generates a new object. The other is a series of events issued by some third party code I'm using.

    I want to be able to combine these two streams using something like 'combineLatest' to generate a single Observable.

    With my button, I've followed the following pattern:

    const signal = Observable.create(
                (observer) => {
                    this.additem= (item) => observer.next(item);
    this.item$ = signal.map((item) => [item])
                                .scan((accumulator, value) => {
                                    return accumulator.concat(value);

    However, I'm seeing a lot of information saying I should be using Subjects instead - which I am trying to use with my event callback like so:

    sort$ = new Subject();

    Then I'm attempting to combine these like this:

    combine$ = Observable.combineLatest(sort$, item$,
                      (sort, items) => {
                          return "something that does stuff with these";}

    My questions are - what is the preferred pattern for 'manually' generating streams? Can/should observables and subjects be meshed together into a single observable like I'm trying to do here?

  • Potatoes
    Potatoes over 6 years
    Thanks! This is the kind of answer I was looking for. Pitfalls like this weren't apparent to me looking at the documentation, and I knew there had to be reasons to choose one approach over the other since both seemed to work.
  • martin
    martin over 6 years
    @Potatoes I added a few links to articles I consider very helpful