RxJava : How to handle error with zip operator ?

20,776

Solution 1

You can use onErrorResumeNext to return some Observable or onErrorReturn to return some default value to zip, like:

Observable.zip(
   responseOneObservable
       .onErrorReturn(new Func1<Throwable, ResponseOne>() {
        @Override
        public ResponseOne call(final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Func1<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo call(final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

See onError handling for more info.


UPDATE: With RxJava 2.0 you must use Function instead of Func1:

import io.reactivex.functions.Function;
...
Observable.zip(
   responseOneObservable
       .onErrorReturn(new Function<Throwable, ResponseOne>() {
        @Override
        public ResponseOne apply(@NonNull final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Function<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo apply(@NonNull final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

Or using lambdas:

Observable.zip(
   responseOneObservable
       .onErrorReturn(throwable -> new ResponseOne()),
   responseTwoObservable
       .onErrorReturn(throwable -> new ResponseTwo()),
   ...

Or using Kotlin:

Observable.zip(
   responseOneObservable
       .onErrorReturn { ResponseOne() },
   responseTwoObservable
       .onErrorReturn { ResponseTwo() },
   ...

Solution 2

You can return default response from either of two observables with onErrorResumeNext operator.

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

Also see Error handling in RxJava - Dan Lew

Share:
20,776
Priyank Patel
Author by

Priyank Patel

“I'm not a great programmer; I'm just a good programmer with great habits.” ― Kent Beck About Me: https://about.me/priyank-patel

Updated on July 12, 2022

Comments

  • Priyank Patel
    Priyank Patel almost 2 years

    I am using RxJava and RxAndroid with Retrofit2.

    Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());
    
    Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());
    

    Using zip operator as below on above two Observer.

     Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
                @Override
                    public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
                      ArrayList<TestData> testDataList = new ArrayList();
                          // Add test data from response responseOne & responseTwo
                      return testDataList;
                } 
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<ArrayList<TestData>>() {
    
            @Override
            public void onNext(ArrayList<TestData> testDataList) {
    
            }
    
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted" );
            }
    
            @Override
            public void onError(Throwable t) {
                Log.d(TAG, "onError Throwable: " + t.toString() );
            }
        });
    

    If there is any error occurs during retrofit http call in responseOneObservable and responseTwoObservable then it will directly call onError method of subscriber of testDataObservable.

    I want to continue in zip operator's call method even if anyone of two observable got success response.

    How to handle error response using zip operator ?