What is the proper way to handle subscriptions in RxJava/RxAndroid for an Activity Lifecycle?

14,575

Solution 1

  1. Yes, it will stop, but you should also set subscription to null in onError too (or after error, you won't load items again).

    Also do not forget that fragment can be stopped, but not destroyed (in back stack, for example) and you might not want to observe anything in this case. If you move unsubscribe from onDestroy to onStop do not forget to initialise compositeSubscription in onCreateView each time view is created (because after CompositeSubscription is unsubscribed you no longer can add subscriptions there).

  2. Yes correct. But I think that compositeSubscription.remove can be omitted, because you already check for null.

Solution 2

You don't need any third-party lib to manage Activity lifecycle. Try the following codes:

public class LifecycleBinder {

    public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Activity target, LifecycleEvent event) {
        final Application app = target.getApplication();
        final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create();
        final Application.ActivityLifecycleCallbacks callbacks = new Application.ActivityLifecycleCallbacks() {
            @Override
            public void onActivityCreated(Activity activity, Bundle savedInstanceState) {

            }

            @Override
            public void onActivityStarted(Activity activity) {

            }

            @Override
            public void onActivityResumed(Activity activity) {

            }

            @Override
            public void onActivityPaused(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_PAUSED);
            }

            @Override
            public void onActivityStopped(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_STOPPED);
            }

            @Override
            public void onActivitySaveInstanceState(Activity activity, Bundle outState) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE);
            }

            @Override
            public void onActivityDestroyed(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }
        };

        app.registerActivityLifecycleCallbacks(callbacks);
        return subscribeUtilEvent(publishSubject, event, new Action0() {
            @Override
            public void call() {
                app.unregisterActivityLifecycleCallbacks(callbacks);
            }
        });
    }

    public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Fragment target, LifecycleEvent event) {
        final FragmentManager manager = target.getFragmentManager();
        if (manager == null) {
            throw new NullPointerException("fragment manager is null!");
        }

        final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create();
        final FragmentManager.FragmentLifecycleCallbacks callbacks = manager.new FragmentLifecycleCallbacks() {

            @Override
            public void onFragmentPreAttached(FragmentManager fm, Fragment f, Context context) {
            }

            @Override
            public void onFragmentAttached(FragmentManager fm, Fragment f, Context context) {
            }

            @Override
            public void onFragmentCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentActivityCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentViewCreated(FragmentManager fm, Fragment f, View v, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentStarted(FragmentManager fm, Fragment f) {
            }

            @Override
            public void onFragmentResumed(FragmentManager fm, Fragment f) {
            }

            @Override
            public void onFragmentPaused(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_PAUSED);
            }

            @Override
            public void onFragmentStopped(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_STOPPED);
            }

            @Override
            public void onFragmentSaveInstanceState(FragmentManager fm, Fragment f, Bundle outState) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE);
            }

            @Override
            public void onFragmentViewDestroyed(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_VIEW_DESTORYED);
            }

            @Override
            public void onFragmentDestroyed(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }

            @Override
            public void onFragmentDetached(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }
        };
        manager.registerFragmentLifecycleCallbacks(callbacks, true);

        return subscribeUtilEvent(publishSubject, event, new Action0() {
            @Override
            public void call() {
                manager.unregisterFragmentLifecycleCallbacks(callbacks);
            }
        });
    }

    private static <R, T> Observable.Transformer<R, R> subscribeUtilEvent(final Observable<T> source, final T event, final Action0 doOnComplete) {
        return new Observable.Transformer<R, R>() {
            @Override
            public Observable<R> call(Observable<R> rObservable) {
                return rObservable.takeUntil(takeUntilEvent(source, event)).doOnCompleted(doOnComplete);
            }
        };
    }

    private static <T> Observable<T> takeUntilEvent(final Observable<T> src, final T event) {
        return src.takeFirst(new Func1<T, Boolean>() {
            @Override
            public Boolean call(T lifecycleEvent) {
                return lifecycleEvent.equals(event);
            }
        });
    }
}

Lifecycle events:

public enum LifecycleEvent {
    ON_PAUSED,
    ON_STOPPED,
    ON_SAVE_INSTANCE_STATE,
    ON_DESTROYED,
    ON_VIEW_DESTORYED,
    ON_DETACHED,
}

Usage:

myObservable
   .compose(LifecycleBinder.subscribeUtilEvent(this, LifecycleEvent.ON_DESTROYED))
   .subscribe();
Share:
14,575

Related videos on Youtube

Sree
Author by

Sree

Updated on June 04, 2022

Comments

  • Sree
    Sree almost 2 years

    I am just getting started on RxJava/RxAndroid. I want to avoid context leaks so I created a BaseFragment like so:

    public abstract class BaseFragment extends Fragment {
    
        protected CompositeSubscription compositeSubscription = new CompositeSubscription();
    
        @Override
        public void onDestroy() {
            super.onDestroy();
    
            compositeSubscription.unsubscribe();
        } 
    } 
    

    And inside my fragment that extends BaseFragment, I am doing this:

    protected void fetchNewerObjects(){
            if(!areNewerObjectsFetching()){ //if it is not already fetching newer objects
    
                Runtime.getRuntime().gc();//clean out memory if possible
    
                fetchNewObjectsSubscription = Observable
                    .just(new Object1())
                    .map(new Func1<Object1, Object2>() {
                        @Override
                        public Object2 call(Object1 obj1) {
                            //do bg stuff
                            return obj2;
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Object2>() {
                        @Override
                        public void onCompleted() {
                            compositeSubscription.remove(fetchNewObjectsSubscription);
                            fetchNewObjectsSubscription = null;
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(ArrayList<NewsFeedObject> newsFeedObjects) {
                            //do stuff
                        }
                    });
    
            //add subscription to composite subscription so it can be unsubscribed onDestroy()
            compositeSubscription.add(fetchNewObjectsSubscription);
        }
    }
    
    protected boolean areNewerObjectsFetching(){
        if(fetchNewObjectsSubscription == null || fetchNewObjectsSubscription.isUnsubscribed()){ //if its either null or is in a finished status
            return false;
        }
        return true;
    }
    

    So I guess my question is two-fold:

    1. Will this stop context leaks because I am unsubscribing onDestroy()?

    2. And am I properly keeping track of wether the observable is "running" by setting the subscription to null after completion and checking the nullity?

  • Sree
    Sree almost 9 years
    so if i unsubscribe in onStop, and it never updates my views then would i need to re-run fetchNewObjects() in onResume?
  • Sree
    Sree almost 9 years
    Also, why onCreate? did you mean onCreateView?
  • marwinXXII
    marwinXXII almost 9 years
    I prefer to do in onCreate because it's invoked before onCreateView, so compositeSubscription is ready before it is used
  • Sree
    Sree almost 9 years
    but according to the docs, it will never hit onCreate(). It goes from onDestroyView() to onCreateView() developer.android.com/intl/zh-cn/guide/components/…
  • marwinXXII
    marwinXXII almost 9 years
    Sorry, right, in onCreateView, I updated answer. About onStop - unsubscribe "stops" processing observable, so, yes, you may want to load data again, if it hasn't been loaded.