Reactive Observable Subscription Disposal

23,997

Solution 1

Disclaimer: I'm also still learning Rx. So I'm not really an expert but I believe the disposable returned by Subscribe will only unsubscribe the subscription. Also if the source completes, like in your case, the unsubscription is done automatically. So I think the Dispose there is redundant and can be safely removed.

See the answer to this question for more info.

Solution 2

The disposable returned by the Subscribe extension methods is returned solely to allow you to manually unsubscribe from the observable before the observable naturally ends.

If the observable completes - with either OnCompleted or OnError - then the subscription is already disposed for you.

Try this code:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

If you run the above you'll see that "Disposed!" is written to the console when the observable completes without you needing call .Dispose() on the subscription.

One important thing to note: the garbage collector never calls .Dispose() on observable subscriptions, so you must dispose of your subscriptions if they have not (or may not have) naturally ended before your subscription goes out of scope.

Take this, for example:

var wc = new WebClient();

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);

var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));

The ds observable will only attach to the event handler when it has a subscription and will only detach when the observable completes or the subscription is disposed of. Since it is an event handler the observable will never complete because it is waiting for more events, and hence disposing is the only way to detach from the event (for the above example).

When you have a FromEventPattern observable that you know will only ever return one value then it is wise to add the .Take(1) extension method before subscribing to allow the event handler to automatically detach and then you don't need to manually dispose of the subscription.

Like so:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);

I hope this helps.

Solution 3

In contrast to some comments it's not at all uncommon to dispose of a subscription from inside OnNext.

While it's true that disposal at OnCompleted and OnError are done for you by a wrapped subscription that the Subscribe extension method creates, you may want to unsubscribe based on a value you're observing (like in your case: the 1st one). You don't always have an observable that is known to produce only one value.

Problem is that you get the IDisposable only after you have subscribed. An observable may call you back on OnNext even before it returns you the IDisposable to unsubscribe (depending on things like the IScheduler it uses).

The System.Reactive.Disposables.SingleAssignmentDisposable comes in handy in this case. It wraps an IDisposable that you may assign late, and will immediately dispose it on assignment if the SingleAssignmentDisposable already has been disposed by then. Also it carries a property IsDisposed which initially is false and is set to true when Dispose() is called.

So:

IObservable<string> source = ...;

var subscription = new SingleAssignmentDisposable();
subscription.Disposable = source.Subscribe(x =>
{
    if (subscription.IsDisposed) // getting notified though I've told it to stop
        return;
    DoThingsWithItem(x);
    if (x == "the last item I'm interested in")
        subscription.Dispose();
});

Solution 4

The Take function will do exactly what you are looking for. In this case, Take(1).

Share:
23,997
Noob
Author by

Noob

Updated on July 25, 2022

Comments

  • Noob
    Noob almost 2 years

    If I have access to an IObservable that I know is only ever going to return one item, will this work and is it the best usage pattern?

    IDisposable disposable = null;
    disposable = myObservable.Subscribe(x =>
      {
         DoThingWithItem(x);
         if (disposable != null)
         {
           disposable.Dispose();
         }
      });