Convert RxJava Observables To Live Data With Kotlin Extension Functions

14,689

Solution 1

In short:

  • Flowable.toLiveData() is already in the androidx.lifecycle:lifecycle-reactivestreams-ktx artifact
  • Observable.toLiveData() should be useful
  • Single.toLiveData() may not be useful in practice
  • Maybe.toLiveData() may not be useful in practice
  • Completable.toLiveData() doesn't really make sense

I think this is a pretty good idea. An example benefit of LiveData is the ability of using it directly in your data-binding layouts. Let's say in your view-model you have:

val user: LiveData<User>

data class User(val firstName: String, val lastName: String)

in your layout you can bind the properties of the User directly:

android:text="${viewModel.user.firstName}"

You can't use reactive streams in data-binding like this. If user was Flowable<User>, referencing ${viewModel.user.firstName} wouldn't work.

Furthermore, data-binding will handle the lifecycle for you (observing changes only in active state and updating the view when changes happen) if your activity or fragment calls ViewDataBinding.setLifecycleOwner(LifecycleOwner):

binding.setLifecycleOwner(this)

The one for converting Completable to LiveData<T> doesn't really make sense to me, because it will never notify the observer about anything, so I'd just get rid of it.

There are some considerations when converting from reactive streams to live data (like I had when I wanted to resume a countdown after rotation), but I don't think they are related to the extension functions you presented, those seem to do their job. The issue to keep in mind here is that when the lifecycle owner moves from active to inactive state, PublisherLiveData cancels the subscription to the stream, and when the state changes to active, it will create a new subscription, which means restarting the stream in many cases (I guess that is if the stream is "cold"), while you probably want to resume the stream from where it was after rotation or other configuration changes. If the stream was "hot" on the other hand, emissions get ignored during the inactive state. I think this problem has to be addressed even if you used reactive streams directly and handled life cycle manually. But the thing is that simply converting reactive streams to LiveData isn't enough to solve this problem.

It's good to document those methods as not handling the error state, which has to be handled upstream. Alternatively, that could be one of the improvements for these functions - transforming the stream first to handle errors (as a lambda parameter with a default for example). Another possibility would be to utilize Result (experimental at the moment), or something similar to encapsulate the success or error.


As an afterthought, regarding this part that I wrote above:

There are some considerations when converting from reactive streams to live data, but I don't think they are related to the extension functions you presented.

I still think it holds true in general, however I'm not sure if you actually want to use Single.toLiveData() and Maybe.toLiveData() in practice most of the time. Since Maybe and Single are modeling one-time operations, then it may be preferable not to cancel it when there are no active observers and have to re-start it once there are new active observers. Instead, posting to some MutableLiveData and disposing the Single/Maybe in onCleared might be useful (I'm not sure that can be encapsulated in an extension function). They still may have some use that I simply don't see at the moment.

By the way, your Flowable.toLiveData() is already in the androidx.lifecycle:lifecycle-reactivestreams-ktx artifact.

This leaves the Observable.toLiveData(), which I think should be just as useful as Flowable one.

Solution 2

Your solution is fine if you want to use LiveData and Rx together.

If you just want to auto-dispose your subscription you could implement it on Disposable like this:

private class LifecycleDisposable(obj: Disposable) :
        DefaultLifecycleObserver, Disposable by obj {
    override fun onStop(owner: LifecycleOwner) {
        if (!isDisposed) {
            dispose()
        }
    }
}

fun Disposable.attachToLifecycle(owner: LifecycleOwner) {
    owner.lifecycle.addObserver(LifecycleDisposable(this))
}

and call it like

Observable.just(1, 2, 3).subscribe().attachToLifecycle(this)

where this references any LifecycleOwner.

Share:
14,689

Related videos on Youtube

Archie G. Quiñones
Author by

Archie G. Quiñones

Updated on June 24, 2022

Comments

  • Archie G. Quiñones
    Archie G. Quiñones about 2 years

    I've been using alot of RxJava Observables converted to LiveData in my code using LiveDataReactiveStreams.fromPublisher() library. So I though of adding an extension function to the RxJava Observable to easily convert them to LiveData.

    These are my extension functions:

    fun <T> Flowable<T>.toLiveData() :  LiveData<T> {
        return LiveDataReactiveStreams.fromPublisher(this)
    }
    
    fun <T> Observable<T>.toLiveData(backPressureStrategy: BackpressureStrategy) :  LiveData<T> {
        return LiveDataReactiveStreams.fromPublisher(this.toFlowable(backPressureStrategy))
    }
    
    fun <T> Single<T>.toLiveData() :  LiveData<T> {
        return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
    }
    
    fun <T> Maybe<T>.toLiveData() :  LiveData<T> {
        return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
    }
    
    fun <T> Completable.toLiveData() :  LiveData<T> {
        return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
    }
    

    My questions are:

    1. Is this a good idea?
    2. Is there a better way of doing this?
    3. Could these extension functions be better?

    P.S.

    I'm new to Kotlin and so I am asking these question. Anything helpful would be appreciated. Thank you very much.

    • Jhon Fredy Trujillo Ortega
      Jhon Fredy Trujillo Ortega over 6 years
      in my experience, is not a good idea. Instead, is better to use Rx Methos that live data. Live data is easie but Rx is a lot better, have more flexibility and its better to use something that is cross plataform like Rx.
    • Archie G. Quiñones
      Archie G. Quiñones over 6 years
      i don't understand. I am converting RxJava to LiveData so I won't need to subscribe or unsubscribe on life cycle events which make using RxJava in Android much more easier.
    • Jhon Fredy Trujillo Ortega
      Jhon Fredy Trujillo Ortega over 6 years
      check this question stackoverflow.com/questions/46312937/…. That is why is better to get used to use Rx instead of LiveData
    • Archie G. Quiñones
      Archie G. Quiñones over 6 years
      I actually am used using to Rx and the only time i convert the Rx Observable to LiveData is when I need them to get unregistered/reregistered on LifeCycle events. By doing this I do not need to add the mechanism to unscubscribe and resubscribe on my observable when a life cycle occurs.
  • Archie G. Quiñones
    Archie G. Quiñones over 6 years
    How would you handle LifeCycle Events then?
  • Alireza Ahmadi
    Alireza Ahmadi over 6 years
    I think @ArchieQuiñones in right. Also, how do you handle data binding?
  • Archie G. Quiñones
    Archie G. Quiñones over 6 years
    Hi @Reza then is it safe to assume that this is a good idea? Everybody coz everybody here seems to disagree.
  • Alireza Ahmadi
    Alireza Ahmadi over 6 years
    @ArchieQuiñones I really dislike the idea of using two reactive technologies (Rx and LiveData) alongside especially when Rx is so powerful but since Rx observables aren't lifecycle aware and android data binding doesn't work with Rx, I personally use Rx up to ViewModel layer and then convert it to LiveData but I hope we find a better solution to use Rx in all places and remove LiveData.
  • Alireza Ahmadi
    Alireza Ahmadi over 6 years
    @ArchieQuiñones Your extension functions are good but since you're using LiveDataReactiveStreams you can't do error handling! So if some exception happens it goes right into main thread and app will crash. In case you want to use this code with networking you're in a serious trouble. I suggest you use this exception function but with a custom implementation for converting Rx to LiveData.
  • Archie G. Quiñones
    Archie G. Quiñones over 6 years
    Hi @Reza. I totally agree with you. It kinda sucks to need to use two reactive technologies just to solve the lifecycle problems in android. I also do the same way you do it. Keeping Rx at ViewModel layer and only converting to LiveData when the value needs to be observed on the View. I also agree on the error handling part and again I do the same approach as you do. I just really need someone to confirm I'm doing it right. This really helps in as far as I'm going the right way. Thank you very much. :)
  • Archie G. Quiñones
    Archie G. Quiñones about 6 years
    doesn't LiveData auto-dispose Observables by default?
  • Archie G. Quiñones
    Archie G. Quiñones about 6 years
    ohh.. wait I see what you did there... then does that mean i dont have to convert my RxJava to LiveData?
  • harry248
    harry248 about 6 years
    Yes, if auto-disposal is all you care about. All it does is releasing you of manually keeping a reference to the Disposable and disposing it in onStop().
  • Archie G. Quiñones
    Archie G. Quiñones about 6 years
    wow! thanks! I'll look into it this weekend (when i have the time). After that, I'll probably mark this as the answer :) Thank you soo much :)
  • harry248
    harry248 about 6 years
    No problem. Have fun!
  • Archie G. Quiñones
    Archie G. Quiñones about 6 years
    Hi @harry248. Now that I could auto dispose my RxObservables I'm wondering whether I could also have the LiveData Behavior with it to totally remove LiveData on my code. Is that possible? For example, I do a web service with Rx.. if the user suddenly switch app or rotate the screen, I can't dispose the observable (coz the web service call will be cancelled) but I also cant just let it update my UI when it comes back and the app is still at background. Currently I store the webservice result in a LiveData to solve this issue but I wonder if this could be done solely with Rx.. thanks in advance
  • harry248
    harry248 about 6 years
    If you need to keep state, you could use a ViewModel and let it return your Observables. The Replay operator could be helpful too in your case.
  • Archie G. Quiñones
    Archie G. Quiñones about 6 years
    I still don't understand.. can you expound on that? maybe an example? I'm sorry if you feel like this should be on a separate question, I could make a new question if you feel that way. thanks in advance.
  • interlude
    interlude almost 4 years
    @AlirezaA.Ahmadi I know this is an old thread but if you use onErrorReturnItem and merge the error into your observable thing you can use error handling.