Reconnecting a websocket in Angular and rxjs?

26,558

Solution 1

Actually there now is a WebsocketSubject in rxjs!

 import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket

 let subject = webSocket('ws://localhost:8081');
 subject.subscribe(
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
  );
 subject.next(JSON.stringify({ op: 'hello' }));

It does handle reconnection when you resubscribe to a broken connection. So for example write this to reconnect:

subject.retry().subscribe(...)

See the docs for more info. Unfortunately the searchbox doesn't show the method, but you find it here:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

that #-navigation is not working in my browser, so search for "webSocket" on that page.

Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

Solution 2

For rxjs 6 implementation

import { webSocket } from 'rxjs/webSocket'

let subject = webSocket('ws://localhost:8081');
subject.pipe(
   retry() //support auto reconnect
).subscribe(...)

Solution 3

This might not be the good answer but it's way too much for a comment.

The problem might comes from your service :

listenToTheSocket(): Observable<any> {
  this.websocket = new WebSocket(this.destination);

  this.websocket.onopen = () => {
    console.log("WebService Connected to " + this.destination);
  }

  return Observable.create(observer => {
    this.websocket.onmessage = (evt) => {
      observer.next(evt);
    };
  })
  .map(res => res.data)
  .share();
}

Do you think that you go multiple times in your component into the ngOnInit method?
You should try to put a console.log into ngOnInit to be sure.

Because if you do so, in your service you'll override the this.websocket with a new one.

You should try something like that instead :

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private websocketSubject$ = new BehaviorSubject<any>();
  private websocket$ = this.websocketSubject$.asObservable();

  private destination = 'wss://notessensei.mybluemix.net/ws/time';

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {
    if (this.websocket) {
      return this.websocket$;
    }

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => console.log(`WebService Connected to ${this.destination}`);

    this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
  }
}

The BehaviorSubject will send the last value if it receives an event before you subscribe to it. Plus, as it's a subject, no need to use the share operator.

Share:
26,558
John
Author by

John

I'm the Solution Director Innovation for HCL Software, looking after an incredibly talented team of engineers based in Manila, Philippines. I hold multiple Salesforce certifications. Before that I worked in various roles: Salesforce Program Architect Director, IBM Code Mage for Collaboration &amp; Cloud in IBM ASEAN; NotesSensei for IBM Lotus Notes &amp; Domino, independent consultant and CTO for startups long before they became en vogue and venture backed. I started with COBOL (while studying economic, since there were no spreadsheets then), move to dBase and Foxpro. Fell in love with @1-2-3 and Lotus Agenda. Lotus Notes was the next logical step which I started with V2. I do JavaScript, Java &amp; VB.NET and XML and XSLT. My first mobile application ran on a Palm Pilot mobilising a SAP inventory system (that was already last century). I love the maker movement and got a cupboard full of electronics to tinker with. I live in Singapore. My core interests are knowledge management and eLearning. I'm a certified counselor for personcentric organisational development. And yes: plenty of Salesforce certs too. If you feel something is too complicated, chat with me, I'll explain it in simple terms.

Updated on July 07, 2021

Comments

  • John
    John almost 3 years

    I have a ngrx/store (v2.2.2) and rxjs (v5.1.0) based application that listens to a web socket for incoming data using an observable. When I start the application I receive incoming data flawlessly.

    However after a while (updates are coming in quite infrequently) the connection seem to get lost and I don't get anymore incoming data. My code:

    The service

    import { Injectable, OnInit } from '@angular/core';
    import { Observable } from 'rxjs';
    
    @Injectable()
    export class MemberService implements OnInit {
    
      private websocket: any;
      private destination: string = "wss://notessensei.mybluemix.net/ws/time";
    
      constructor() { }
    
      ngOnInit() { }
    
      listenToTheSocket(): Observable<any> {
    
        this.websocket = new WebSocket(this.destination);
    
        this.websocket.onopen = () => {
          console.log("WebService Connected to " + this.destination);
        }
    
        return Observable.create(observer => {
          this.websocket.onmessage = (evt) => {
            observer.next(evt);
          };
        })
          .map(res => res.data)
          .share();
      }
    }
    

    The subscriber

      export class AppComponent implements OnInit {
    
      constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}
    
      ngOnInit() {
        this.memberService.listenToTheSocket().subscribe((result: any) => {
          try {
            console.log(result);
            // const member: any = JSON.parse(result);
            // this.store.dispatch(new MemberActions.AddMember(member));
          } catch (ex) {
            console.log(JSON.stringify(ex));
          }
        })
      }
    }
    

    What do I need to do to reconnect the web socket when it times out, so the observable continues to emit incoming values?

    I had a look at some Q&A here, here and here and it didn't seem to address this question (in a way I could comprehend).

    Note: the websocket at wss://notessensei.mybluemix.net/ws/time is live and emits a time stamp once a minute (in case one wants to test that).

    Advice is greatly appreciated!