Creating an RxJS Observable from a (server sent) EventSource

13,887

Solution 1

You could use the following code to manually create Observable for EventSource stream:

export class AppComponent implements OnInit {
  someStrings:string[] = [];

  constructor(private zone: NgZone) {}

  ngOnInit(){
    const observable = Observable.create(observer => {
      const eventSource = new EventSource('/interval-sse-observable');
      eventSource.onmessage = x => observer.next(x.data);
      eventSource.onerror = x => observer.error(x);

      return () => {
        eventSource.close();
      };
    });

    this.subscription = observable.subscribe({
      next: guid => {
        this.zone.run(() => this.someStrings.push(guid));
      },
      error: err => console.error('something wrong occurred: ' + err)
    });
  }
}

// somewhere
// this.subscription.unsubscribe()

Don't forget to import the NgZone class:

import {Component, OnInit, NgZone} from '@angular/core';

See also Angular2 View Not Changing After Data Is Updated

Solution 2

I should complete Yurzui's answer:

In my case, working with Angular 6 I had some weird behavior when assigning a function to onmessage. I therefore added an event listener instead and it worked like a charm:

const observable = Observable.create(observer => {
  const eventSource = new EventSource('/interval-sse-observable');
  eventSource.addEventListener("message", (event: MessageEvent) => observer.next(event.data));
  eventSource.addEventListener("error", (event: MessageEvent) => observer.error(event));

  return () => {
    eventSource.close();
  };
});
Share:
13,887
balteo
Author by

balteo

Updated on July 05, 2022

Comments

  • balteo
    balteo almost 2 years

    I would like to create a RxJs Observable from an EventSource (server sent events).

    I tried the following:

    import {Component, OnInit} from 'angular2/core';
    import {Subject, Observable}  from 'rxjs/Rx';
    
    @Component({
        selector: 'my-app',
        template: `<h1>My second Angular 2 App</h1>
        <ul>
            <li *ngFor="#s of someStrings">
               a string: {{ s }}
            </li>
        </ul>
        `
    })
    export class AppComponent implements OnInit {
    
        someStrings:string[] = [];
    
        ngOnInit() {
            let eventSource = new EventSource('/interval-sse-observable');
            let observable = Observable.create(eventSource);
            observable.subscribe({
                next: aString => this.someStrings.push(aString.data),
                error: err => console.error('something wrong occurred: ' + err)
            });
        }
    }
    

    But I get the following exception:

    EXCEPTION: Error: Uncaught (in promise): EXCEPTION: TypeError: this._subscribe is not a function in [null]
    ORIGINAL EXCEPTION: TypeError: this._subscribe is not a function
    ORIGINAL STACKTRACE:
    TypeError: this._subscribe is not a function
        at Observable.subscribe (https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js:11210:29)
        at AppComponent.ngOnInit (http://localhost:8080/scripts/app.component.ts!transpiled:30:28)
        at AbstractChangeDetector.ChangeDetector_HostAppComponent_0.detectChangesInRecordsInternal (viewFactory_HostAppComponent:21:99)
        at AbstractChangeDetector.detectChangesInRecords (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9689:14)
        at AbstractChangeDetector.runDetectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9672:12)
        at AbstractChangeDetector.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9661:12)
        at ChangeDetectorRef_.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:5280:16)
        at https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13048:27
        at Array.forEach (native)
        at ApplicationRef_.tick (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13047:34)
    

    For completeness' sake, here is the contents of my index.html:

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>sse demo</title>
        <!-- 1. Load libraries -->
        <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/angular2-polyfills.js"></script>
        <script src="https://code.angularjs.org/tools/system.js"></script>
        <script src="https://code.angularjs.org/tools/typescript.js"></script>
        <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js"></script>
        <script src="https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js"></script>
        <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/http.dev.js"></script>
    
        <!-- 2. Configure SystemJS -->
        <script>
            System.config({
                transpiler: 'typescript',
                typescriptOptions: { emitDecoratorMetadata: true }
            });
            System.import('./scripts/app.ts')
                    .then(null, console.error.bind(console));
        </script>
    </head>
    <body>
    
    <my-app>Loading...</my-app>
    
    </body>
    </html>
    

    Can someone please help?

    edit 1: Following Yurzui's advice, I modified my code as follows:

    ngOnInit() {
        let observable = Observable.create(observer => {
            const eventSource = new EventSource('/interval-sse-observable');
            eventSource.onmessage = x => observer.next(console.log(x));
            eventSource.onerror = x => observer.error(console.log('EventSource failed'));
    
            return () => {
                eventSource.close();
            };
        });
        observable.subscribe({
            next: aString => this.someStrings.push(aString.data),
            error: err => console.error('something wrong occurred: ' + err)
        });
    }
    

    It does log the first message in the console as follows:

    MessageEvent {isTrusted: true, data: "c374a15b-b37d-498e-8ab0-49643b79c1bb", origin: "http://localhost:8080", lastEventId: "", source: null…}bubbles: falsecancelBubble: falsecancelable: falsecurrentTarget: EventSourcedata: "c374a15b-b37d-498e-8ab0-49643b79c1bb"defaultPrevented: falseeventPhase: 0isTrusted: trueisTrusted: truelastEventId: ""origin: "http://localhost:8080"path: Array[0]ports: nullreturnValue: truesource: nullsrcElement: EventSourcetarget: EventSourcetimeStamp: 6257.125type: "message"__proto__: MessageEvent
    Rx.js:10982 Uncaught TypeError: Cannot read property 'data' of undefinedSystem.register.exports_1.execute.AppComponent.ngOnInit.observable.subscribe.next @ app.component.ts:29SafeSubscriber.__tryOrUnsub @ Rx.js:10979SafeSubscriber.next @ Rx.js:10934Subscriber._next @ Rx.js:10894Subscriber.next @ Rx.js:10871System.register.exports_1.execute.AppComponent.ngOnInit.Rx_1.Observable.create.eventSource.onmessage @ app.component.ts:21
    

    Now if instead of logging the x variable in the console I just pass it to the next method as follows:

    eventSource.onmessage = x => observer.next(x);
    

    The server sent events are retrieved by the client (I see them in the chrome dev tools) but nothing is displayed in the template indicating the array of strings is not populated...

    By the way I had to remove the JSON.parse(x.data) as it was causing an error.

  • balteo
    balteo about 8 years
    Thanks @yurzui, I have edited my post to take into account your reply.
  • yurzui
    yurzui about 8 years
    Strange, i've tried this solution and it works. What you get an error?
  • balteo
    balteo about 8 years
    No error and no output. Do you have a working plunker/fiddle?
  • yurzui
    yurzui about 8 years
    I've added plunker example
  • balteo
    balteo about 8 years
    I'll look into that. Thanks a lot Yurzui. FYI, the creation of observable will soon be available in RxJs 5 see: github.com/ReactiveX/rxjs/issues/1644
  • balteo
    balteo about 8 years
    Thanks a lot Yurzui.
  • Gautam
    Gautam over 6 years
    How do you close the eventSource connection using observable instance in this case?