Why are Subjects not recommended in .NET Reactive Extensions?

38,220

Solution 1

Ok, If we ignore my dogmatic ways and ignore "subjects are good/bad" all together. Let us look at the problem space.

I bet you either have 1 of 2 styles of system you need to ingrate to.

  1. The system raises an event or a call back when a message arrives
  2. You need to poll the system to see if there are any message to process

For option 1, easy, we just wrap it with the appropriate FromEvent method and we are done. To the Pub!

For option 2, we now need to consider how we poll this and how to do this effciently. Also when we get the value, how do we publish it?

I would imagine that you would want a dedicated thread for polling. You wouldn't want some other coder hammering the ThreadPool/TaskPool and leaving you in a ThreadPool starvation situation. Alternatively you don't want the hassle of context switching (I guess). So assume we have our own thread, we will probably have some sort of While/Sleep loop that we sit in to poll. When the check finds some messages we publish them. Well all of this sounds perfect for Observable.Create. Now we probably cant use a While loop as that wont allow us to ever return a Disposable to allow cancellation. Luckily you have read the whole book so are savvy with Recursive scheduling!

I imagine something like this could work. #NotTested

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

The reason I really don't like Subjects, is that is usually a case of the developer not really having a clear design on the problem. Hack in a subject, poke it here there and everywhere, and then let the poor support dev guess at WTF was going on. When you use the Create/Generate etc methods you are localizing the effects on the sequence. You can see it all in one method and you know no-one else is throwing in a nasty side effect. If I see a subject fields I now have to go looking for all the places in a class it is being used. If some MFer exposes one publicly, then all bets are off, who knows how this sequence is being used! Async/Concurrency/Rx is hard. You don't need to make it harder by allowing side effects and causality programming to spin your head even more.

Solution 2

In general you should avoid using Subject, however for the thing you are doing here I think they work quite well. I asked a similar question when I came across the "avoid subjects" message in Rx tutorials.

To quote Dave Sexton (of Rxx)

"Subjects are the stateful components of Rx. They are useful for when you need to create an event-like observable as a field or a local variable."

I tend to use them as the entry point into Rx. So if I have some code that needs to say 'something happened' (like you have), I would use a Subject and call OnNext. Then expose that as an IObservable for others to subscribe to (you can use AsObservable() on your subject to make sure nobody can cast to a Subject and mess things up).

You could also achieve this with a .NET event and use FromEventPattern, but if I'm only going to turn the event into an IObservable anyway, I don't see the benefit of having an event instead of a Subject (which might mean I'm missing something here)

However, what you should avoid quite strongly is subscribing to an IObservable with a Subject, i.e. don't pass a Subject into the IObservable.Subscribe method.

Solution 3

Often when you're managing a Subject, you're actually just reimplementing features already in Rx, and probably in not as robust, simple and extensible a way.

When you're trying to adapt some asynchronous data flow into Rx (or create an asynchronous data flow from one that's not currently asynchronous), the most common cases are usually:

  • The source of data is an event: As Lee says, this is the simplest case: use FromEvent and head to the pub.

  • The source of data is from a synchronous operation and you want polled updates, (eg a webservice or database call): In this case you could use Lee's suggested approach, or for simple cases, you could use something like Observable.Interval.Select(_ => <db fetch>). You may want to use DistinctUntilChanged() to prevent publishing updates when nothing has changed in the source data.

  • The source of data is some kind of asynchronous api that calls your callback: In this case, use Observable.Create to hook up your callback to call OnNext/OnError/OnComplete on the observer.

  • The source of data is a call that blocks until new data is available (eg some synchronous socket read operations): In this case, you can use Observable.Create to wrap the imperative code that reads from the socket and publishes to the Observer.OnNext when data is read. This may be similar to what you're doing with the Subject.

Using Observable.Create vs creating a class that manages a Subject is fairly equivalent to using the yield keyword vs creating a whole class that implements IEnumerator. Of course, you can write an IEnumerator to be as clean and as good a citizen as the yield code, but which one is better encapsulated and feels a neater design? The same is true for Observable.Create vs managing Subjects.

Observable.Create gives you a clean pattern for lazy setup and clean teardown. How do you achieve this with a class wrapping a Subject? You need some kind of Start method... how do you know when to call it? Or do you just always start it, even when no one is listening? And when you're done, how do you get it to stop reading from the socket/polling the database, etc? You have to have some kind of Stop method, and you have to still have access not just to the IObservable you're subscribed to, but the class that created the Subject in the first place.

With Observable.Create, it's all wrapped up in one place. The body of Observable.Create is not run until someone subscribes, so if no one subscribes, you never use your resource. And Observable.Create returns a Disposable that can cleanly shutdown your resource/callbacks, etc - this is called when the Observer unsubscribes. The lifetimes of the resources you're using to generate the Observable are neatly tied to the lifetime of the Observable itself.

Solution 4

The quoted block text pretty much explains why you shouldn't be using Subject<T>, but to put it simpler, you are combining the functions of observer and observable, while injecting some sort of state in between (whether you're encapsulating or extending).

This is where you run into trouble; these responsibilities should be separate and distinct from each other.

That said, in your specific case, I'd recommend that you break your concerns into smaller parts.

First, you have your thread that is hot, and always monitoring the hardware for signals to raise notifications for. How would you do this normally? Events. So let's start with that.

Let's define the EventArgs that your event will fire.

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

Now, the class that will fire the event. Note, this could be a static class (since you always have a thread running monitoring the hardware buffer), or something you call on-demand which subscribes to that. You'll have to modify this as appropriate.

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

So now you have a class that exposes an event. Observables work well with events. So much so that there's first-class support for converting streams of events (think of an event stream as multiple firings of an event) into IObservable<T> implementations if you follow the standard event pattern, through the static FromEventPattern method on the Observable class.

With the source of your events, and the FromEventPattern method, we can create an IObservable<EventPattern<BaseFrameEventArgs>> easily (the EventPattern<TEventArgs> class embodies what you'd see in a .NET event, notably, an instance derived from EventArgs and an object representing the sender), like so:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

Of course, you want an IObservable<IBaseFrame>, but that's easy, using the Select extension method on the Observable class to create a projection (just like you would in LINQ, and we can wrap all of this up in an easy-to-use method):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}
Share:
38,220
Anthony
Author by

Anthony

Updated on June 14, 2020

Comments

  • Anthony
    Anthony almost 4 years

    I am currently getting to grips with the Reactive Extensions framework for .NET and I am working my way through the various introduction resources I've found (mainly http://www.introtorx.com)

    Our application involves a number of hardware interfaces that detect network frames, these will be my IObservables, I then have a variety of components that will consume those frames or perform some manner of transform on the data and produce a new type of frame. There will also be other components that need to display every n'th frame for example. I am convinced that Rx is going to be useful for our application, however I am struggling with the implementation details for the IObserver interface.

    Most (if not all) of the resources I have been reading have said that I should not implement the IObservable interface myself but use one of the provided functions or classes. From my research it appears that creating a Subject<IBaseFrame> would provide me what I need, I would have my single thread that reads data from the hardware interface and then calls the OnNext function of my Subject<IBaseFrame> instance. The different IObserver components would then receive their notifications from that Subject.

    My confusion is coming from the advice give in the appendix of this tutorial where it says:

    Avoid the use of the subject types. Rx is effectively a functional programming paradigm. Using subjects means we are now managing state, which is potentially mutating. Dealing with both mutating state and asynchronous programming at the same time is very hard to get right. Furthermore, many of the operators (extension methods) have been carefully written to ensure that correct and consistent lifetime of subscriptions and sequences is maintained; when you introduce subjects, you can break this. Future releases may also see significant performance degradation if you explicitly use subjects.

    My application is quite performance critical, I am obviously going to test the performance of using the Rx patterns before it goes in to production code; however I am worried that I am doing something that is against the spirit of the Rx framework by using the Subject class and that a future version of the framework is going to hurt performance.

    Is there a better way of doing what I want? The hardware polling thread is going to be running continuously whether there are any observers or not (the HW buffer will back up otherwise), so this is a very hot sequence. I need to then pass the received frames out to multiple observers.

    Any advice would be greatly appreciated.

  • casperOne
    casperOne over 11 years
    Why do you need state at all? As shown by my answer, if you break down the problem into separate pieces, you don't really have to manage state at all. Subjects shouldn't be used in this case.
  • Anthony
    Anthony over 11 years
    Thank you for your response @casperOne, this was my initial approach but it felt "wrong" to add an event just so I could wrap it with Rx. I currently use delegates (and yes, I know that that is exactly what an event is!) to fit in with the code used for loading and saving configuration, this has to be able to rebuild the component pipelines and the delegate system gave me most flexibility. Rx is giving me a headache in this area now but the power of everything else in the framework is making solving the config problem very worthwhile.
  • Anthony
    Anthony over 11 years
    I'm just reading this answer now but I felt I ought to point out that I would never consider exposing the Subject interface! I am using it to provide the IObservable<> implementation within a sealed class (that exposes the IObservable<>). I can definitely see why exposing the Subject<> interface would be a Bad Thing™
  • casperOne
    casperOne over 11 years
    @Anthony If you can get his code sample to work, great, but as I've commented, it makes no sense. As for feeling "wrong", I don't know why subdividing things into logical parts seems "wrong", but you've not given enough detail in your original post to indicate how to best translate this to IObservable<T> as no information about how you're currently signaling with that information is given.
  • Wilka
    Wilka over 11 years
    @casperOne You don't need state outside of the Subject<T> or event (which both have collections of things to call, observers or event handlers). I just prefer to use a Subject if the only reason to add an event is to wrap it with FromEventPattern. Aside from a change in exceptions schematics, which might be important to you, I don't see a benefit to avoiding Subject in this way. Again, I might be missing something else here that the event preferable to the Subject. The mention of state was just part of the quote, and it seemed better to leave it in. Maybe it's clearer without that part?
  • user10479
    user10479 about 11 years
    hey, sorry to be thick, but i just don't really understand your code. what are ListenToMessages() and GetMessages() doing and returning?
  • Lee Campbell
    Lee Campbell about 11 years
    Apologies, the answer was not very clear. ListenToMessages() is effectively a way to expose pull data as push data. GetMessages() is supposed to represent accessing the pull data (reading new rows from a database for example).
  • kitsune
    kitsune over 10 years
    @casperOne In your opinion, would the use of Subjects be appropriate for a Message Bus / Event Aggregator?
  • casperOne
    casperOne over 10 years
    @kitsune No, I don't see why they would. If you're thinking "optimization" you have to ask the question of whether or not that's the problem, have you measured Rx to be the cause of the problem?
  • Lee Campbell
    Lee Campbell about 10 years
    I agree here with casperOne that splitting up the concerns is a good idea. I would like to point out, that if you go with the Hardware to Event to Rx pattern, you lose the error semantics. Any lost connections or sessions etc. wont be exposed to the consumer. Now the consumer cant decide if they want to retry, disconnect, subscribe to another sequence or something else.
  • James Moore
    James Moore over 9 years
    @casperOne - but you also shouldn't create an event just to wrap it with FromEventPattern. That's obviously a terrible idea.
  • Dave Sexton
    Dave Sexton over 9 years
    I've explained my quote in more depth in this blog post.
  • jeromerg
    jeromerg over 9 years
    I don't understand. I use Rx for (ViewModels) component-contracts, analogous to getter/setter properties: IObservable for getter, IObserver for setter, ISubject for getter+setter. Implementation is mostly with BehaviorSubject. So I keep a modularized architecture, can wire the components together with powerful Rx chaining expression, and can also implement the inner component logic with Rx expressions. I see only advantages and no drawback. Please explain! Am I doing something wrong?
  • Lee Campbell
    Lee Campbell about 9 years
    For your personal project @jeromerg, this may be fine. However in my experience developers struggle with WPF, MVVM, unit testing GUI design and then throwing in Rx can make things more complicated. I have tried the BehaviourSubject-as-a-property pattern. However I found that it was much more adoptable for others if we used standard INPC properties and then using a simple Extension method to convert this to IObservable. Additionally, you will need custom WPF bindings to work with your behaviour subjects. Now your poor team has to learn WPF, MVVM, Rx and your new framework too.
  • jeromerg
    jeromerg about 9 years
    @lee-campbell: Interesting! Can you tell more about the extension methods? After 6-months experience: Rx is very contagious and mixing programming philosophy (procedural-OO vs. Declarative-Rx) is completely confusing. So we tried to switch all ViewModel logic to Rx (BehaviourSubject-as-a-property pattern): all interactions between components and all internal state calculations. UI-Binding occurs through additional INPC properties. Result is consistent but indeed somehow "over-designed". I will try to publish an example as discussion material.
  • Evan Moran
    Evan Moran about 9 years
    Very clear explanation of Observable.Create. Thank you!
  • Lee Campbell
    Lee Campbell about 9 years
    You could look to my RxCookbook sample that shows usage of Extension methods to make INPC observable - github.com/LeeCampbell/RxCookbook/blob/master/Model/… and similar methods for making ObservableCollections observable in a Rx fashion - github.com/LeeCampbell/RxCookbook/blob/master/Model/…
  • scorpiodawg
    scorpiodawg almost 9 years
    I tend to use them as the entry point into Rx. This hit the nail on the head for me. I have a situation where there is an API that when invoked generates events that I'd like to pass through a reactive processing pipeline. The Subject was the answer for me, since the FromEventPattern doesn't seem to exist in RxJava AFAICT.
  • James Moore
    James Moore about 8 years
    I think your initial two options are wrong; the vast majority of the systems out there (meaning Android and iOS, even with .Net) use #3, which is that the system is going to call methods on your existing objects when stuff happens. They aren't events, they aren't calls on objects that you can create inside something like Observable.Create.
  • Lee Campbell
    Lee Campbell about 8 years
    @JamesMoore callbacks (calling a method on your objects) can certainly be wrapped in Obs.Create. This give the benefit of tying lifetime scopes together.
  • James Moore
    James Moore about 8 years
    @LeeCampbell - I think I'm missing something fundamental here. Object creation and lifetime is usually outside your control for UI framework elements. If the OS instantiates your Android Activity, for example, and then calls various methods on it (OnResume, OnDestroy, OnViewCreated, etc), how would you wrap things in Obs.Create? It's obvious how you do things when you can create things inside the context of the Create() function, but that's almost never the case when you're interacting with UI; you tell the system which class to construct, you don't get to construct it yourself.
  • James Moore
    James Moore about 8 years
    @LeeCampbell, to put it in terms of your code example, the normal way would be that MessageListener is constructed by the system (you probably register the class name somehow), and you're told that the system will then call OnCreate() and OnGoodbye(), and will call message1(), message2(), and message3() as messages are generated. It seems like messageX[123] would call OnNext on a subject, but is there a better way?
  • Lee Campbell
    Lee Campbell about 8 years
    Hi James, the original question was related to .NET. I need to brush up on my Android skills, but I have successfully been building .NET GUIs for Financial Institutions with a heavy usage of Rx, and virtually no usage of Subjects. I have also built Android apps and didn't find this issue, but that was way back when this post was written, 3yrs ago. Generally I find where there is a subject in a .NET GUI app, there is a design flaw, and normally this is also manifested in a performance penalty too.
  • Lee Campbell
    Lee Campbell about 8 years
    @JamesMoore as these things are much easier to explain with concrete examples. If you know of an Open Source Android app that uses Rx and Subjects, then maybe I can find time to see if I can provide a better way. I do understand that it is not very helpful to stand on a pedestal and say Subjects are bad. But I think things like IntroToRx, RxCookbook and ReactiveTrader all give various levels of example of how to use Rx.
  • Frank Schwieterman
    Frank Schwieterman over 7 years
    I still have cases where I use a subject, where a broker object exposes the observable (say its just a changeable property). Different components will call the broker telling when that property changes (with a method call), and that method does an OnNext. Consumers subscribe. I think I would use a BehaviorSubject in this case, is that appropriate?
  • Niall Connaughton
    Niall Connaughton over 7 years
    It depends on the situation. Good Rx design tends to transform the system towards an async/reactive architecture. It can be hard to cleanly integrate small components of reactive code with a system that's of imperative design. The band aid solution is to use Subjects to turn imperative actions (function calls, property sets) into observable events. Then you end up with little pockets of reactive code and no real "aha!" moment. Changing the design to model data flow and react to it usually gives a better design, but it's a pervasive change and requires a mindset shift and team buy-in.
  • Robetto
    Robetto about 5 years
    I would state here (as Rx unexperienced) that: By using Subjects you can enter the world of Rx within a grown imperative application and slowly transform it. Also to gain first experiences.... and certainly later change your code to how it should have been from the beginning (lol). But to begin with I think it could be worthwhile using subjects.
  • Bondolin
    Bondolin over 2 years
    @LeeCampbell sorry to dredge up an old thread, but I have exactly the situation Mr. Moore is referring to - stackoverflow.com/questions/70866471/…. Any advice would be welcome.