June 22, 2013

To Use Subject Or Not To Use Subject?

That is the question!  There appears to be some confusion on the web about whether or not Subject<T> should be used, ever.  Tis a question oft asked by developers new to Rx after reading expert recommendations to avoid subjects, due to common misuse, yet subjects seem to persist an air of mystery even for seasoned reactive developers.

First, a primer on Subjects.  Take a moment to read that.

Done?  Great.  Let's continue...

As you may already know, Erik Meijer doesn't like subjects.  End of discussion, right?  Wrong!  Erik doesn't like subjects to be used in place of purely functional alternatives.1

As Erik puts it:

[Subjects] are the "mutable variables" of the Rx world and in most cases you do not need them.

So let's enumerate the cases where we should and shouldn't use subjects.

We'll start by examining two interesting properties of subjects:

  1. They implement IObservable<T> and IObserver<T> simultaneously, which makes them suitable for both output and input, respectively.
  2. They are hot(See also: Hot and Cold Observables)

Thus to answer the question of whether we should use a subject when defining an observable (output) we must consider:

  1. the input; i.e., the source of the OnNext, OnCompleted and OnError notifications.
  2. the desired temperature of the generated observable; e.g., hot or cold.

To determine whether or not to use a subject, start by asking yourself the following questions:

  1. what is the source of the notifications?  I.e., where do the values of T come from?  Who is responsible for calling OnNext?
  2. must observers share subscription side-effects or not?  I.e., does calling Subscribe execute some kind of initialization code each time it's called or does initialization occur once and its state is shared by every call to Subscribe?

Let's answer all possible combinations of these 2 questions.  I'll provide more details and examples later in this post to explain why these answers make sense.

Question 1:  What is the source of the notifications?

The source can be one of two kinds: Let's call them external and local.  An external source is any observable or event that already exists outside of your code.  A local source is when you generate an observable from your code, kind of like defining and raising a custom .NET event.  More details on these distinctions later in this post.

  1. If the source is external, then do not use a subject.
  2. If the source is local, then do use a subject.

Question 2:  Must observers share subscription side-effects or not?

  1. If not, then do not use a subject.  (Define a cold observable.)
  2. If yes, then do use a subject.  (Define a hot observable.)

Let's enumerate all possible combinations: 

  1. The source is external and I want a cold observable.
  2. The source is external and I want a hot observable.
  3. The source is local and I want a cold observable.
  4. The source is local and I want a hot observable. 

As you may have noticed, there are conflicts.

  • What if you have an external source and you do want to share subscription side-effects?
  • What if you have a local source and you do not want to share subscription side-effects?
  • What if the temperature of an external source differs from the desired temperature?

I believe these conflicts cause additional confusion among developers.  They seem to blur the line when it's appropriate or inappropriate to use subjects, though in fact the line is actually quite clear, as we'll soon see.

Let's resolve these conflicts now.

The source is external and I want a cold observable.

  1. If the source is cold, then use it as is.  You can also wrap it with Defer to add subscription side-effects.
  2. If the source is hot, then wrap it with Defer to add subscription side-effects.

The source is external and I want a hot observable.

  1. If the source is cold, then Publish it.  Alternatively, you can use PublishLast or Replay, if you need their additional behavior.
  2. If the source is hot, then use it as is.

The source is local and I want a cold observable.

  1. Define your observable with existing Rx generator methods; e.g., Return, Range, Timer, Interval, Generate or Create.

The source is local and I want a hot observable.

  1. This scenario is a potential candidate for defining your observable as Subject<T>, or even BehaviorSubject, AsyncSubject or ReplaySubject, if you need their behavior.  More details on why it's merely a "potential candidate" later in this post.

Note that Publish, PublishLast and Replay use subjects internally (via Multicast).  This isn't a coincidence.  Subscribing a subject to a cold observable broadcasts its notifications to multiple observers, thus making it hot.

So it's clear there are only two scenarios where it's correct to use subjects:

  1. The source is external and cold, and I want a hot observable.
  2. The source is local and I want a hot observable.

Let's simplify those statements.  The two scenarios where it's correct to use subjects are:

  1. Converting a cold observable into a hot observable.
  2. Generating a hot observable imperatively and statefully, without any direct external source.

Although, only in the latter scenario is it correct to use subjects explicitly.  As mentioned previously, Rx defines various operators like Publish for use in the former scenario.  Thus, only 1 scenario remains.

And so now we've answered our original questions:

When should I use a subject?  To generate a hot observable imperatively and statefully, without any direct external source.
When shouldn't I use a subject?  Everywhere else.

Perhaps that answer is a bit vague.  What do I mean by imperative and stateful?  What about local vs. external?  And why did I mention before that our final scenario is only a "potential candidate" for using a subject?

If (true == interested_in_the_answers_to_these_questions) { continue; } else { goto Conclusion; }

External Sources

Let's redefine external as quite simply a source of notifications that you do not have to generate yourself.  It comes from outside the scope of the observable that you're defining, such as a dependent object or a parameter.  It could be any number of things; e.g., an event on another object, an event on a base class, a timer callback, a Task<T> that represents the asynchronous result of reading from a file or sending a web request, an IEnumerable<T>, etc.

All of these examples have something in common: they can easily be converted into IObservable<T> using various From* and To* operators in Rx.  For example, Task<T> is converted by the ToObservable operator.  So if your external source is not already an IObservable<T>, then simply covert it using one of Rx's existing conversion operators.

Once you have an IObservable<T>, you need to determine whether it's hot or cold and whether you need it to be hot or cold.  If there's a conflict, as defined above, then use either Defer or Publish to change the temperature accordingly.  For this, there's no need to use subjects in your code.2

Finally, use LINQ to query the observable.  Again, you shouldn't use subjects here either.

The result is that you'll have a succinct observable query, controlling side-effects and meeting your specification in a declarative manner.  Subjects are simply the wrong tool for this job.  It's like hammering in a nail with a sledgehammer.  Subjects do much more than what is needed.  (Though as you'll see later in this post, in some ways subjects actually do less than what you may think, potentially introducing nasty threading bugs.)

Local / Imperative / Stateful Sources

I'm defining local as a source where you imperatively push notifications toward the outside of a stateful scope.  Another way of looking at it: a local source is external to other scopes that can't access its implementation.

Sorry if that explanation is confusing.  Let's clear it up.  Consider the following abstract requirements:

  1. You must define a hot observable.
  2. The code that generates the observable's notifications must be encapsulated by some type that you define.
  3. Your code generates notifications when callers invoke certain members of your type or when it performs certain operations.
  4. The observable must be exposed as a public property.

Now let's apply these requirements to what we've already learned earlier: 

  1. Since we're defining a hot observable, Subject<T> is certainly a candidate...
  2. We've established a scope for what is external: any source defined by another type and referenced by ours is external to our type.
  3. We must write code to push notifications (input); therefore, our source is local.  We have no external source to convert or to generate notifications for us.
  4. Our code must be able to reference something in order to push notifications, thus our type must be stateful.  We'll store a reference to "something" in a field.
  5. The field will be returned by a public property (output).
  6. Thus we need a type that represents both input and output, and is hot.

How should we model these requirements?

Scroll down for the answer:
.
.
.
.
.
.
.
.

Answer: Define a .NET event and expose it as an observable public property using FromEventPattern!

Ha, I bet you didn't see that one coming, did you?

Yes, alternatively we could use Subject<T>.  The conditions are perfect.  If we don't actually need or want a .NET event, then we should use a subject; however, if we already have an event defined, then there's no need to duplicate state and "raise" two similar events.  The conversion approach is probably better in that case.

Let's compare the similarity of subjects to .NET events in general:

  1. Events are also hot.
  2. An event's delegate shares the same scope (encapsulation) within our type as our proposed observable.
  3. We also write code to raise events (input).
  4. Our code must be able to reference the delegate in order to raise events, thus our type must be stateful.  We store a reference to the delegate in a field.
  5. The field will be returned by a public event (output).
  6. A multicast delegate represents both input and output, and is hot.

Subject<T> is kind of like Rx's implementation of an observable "event".  It broadcasts notifications to multiple observers, like raising an event for multiple event handlers.  It is the stateful component of Rx as it can be stored in a field or a variable and mutated (invoked) imperatively.  It is hot, so subscribing does not cause any side-effects (and subscribers may have already missed notifications, which is a common symptom of events and hot observables alike.)

It's correct to use Subject<T> to replace local .NET events, though .NET events are still useful sometimes due to their metadata and first-class language/tooling support.  It all depends on context, so consider this carefully before replacing all of your .NET events with subjects.  You may want to examine each event on a case-by-case basis.  And keep in mind that you can always expose an event as an observable via FromEventPattern, or perhaps it's best to leave that decision up to the consumers of your class and simply expose a .NET event.

Subjects actually have an additional related use case that events cannot cover: If we shrink our scope from type to method and apply the same logic as before, then we'll discover that subjects may still apply though events do not.

Let's specify similar method-scoped requirements:

  1. You must define a hot observable.
  2. The code that generates the observable's notifications must be within a method that you define.
  3. Your code generates notifications when it performs certain asynchronous operations within the method.
  4. The observable must be exposed as the return value of the method.

And again, let's apply these requirements to what we've already learned earlier: 

  1. Since we're defining a hot observable, Subject<T> is certainly a candidate...
  2. We've established a scope for what is external: any source passed into our method and any fields on the method's defining type are external to our method.
  3. We must write code to push notifications (input); therefore, our source is local.  We have no external source to convert or to generate notifications for us.
  4. Our code must be able to reference something in order to push notifications, thus our method must be stateful.  We'll store a reference to "something" in a local variable.
  5. The variable will be returned from the method (output).
  6. Thus we need a type that represents both input and output, and is hot.

How should we model these requirements?

Scroll down for the answer:
.
.
.
.
.
.
.
.

Answer 1: Write an anonymous event (a.k.a., anonymous method, delegate or lambda expression) and convert it using FromEvent!

Ha, did I trick you again?

You thought for sure this time I'd recommend using Subject<T>.  You'd also be correct, we could use Subject<T>.  The conditions may be perfect, though as you'll see in a moment there are better alternatives, even better than this one.

Alright, that answer was actually a joke, I don't recommend that particular approach at all.  It was meant to get you thinking about alternatives to subjects in places where you may think they're absolutely necessary.

But out of curiosity, how would that have worked anyway?

An event is backed by a delegate; it's the delegate that has all of those important properties (e.g., input, output, hot) that we discussed earlier.  For example, we could start by declaring a delegate as our observer.  By assigning the delegate to a local variable we can invoke it imperatively like a method, which is similar to calling OnNext on an observer.  We can use Task.Factory.StartNew to execute an async computation that invokes the delegate.  Finally, we can convert our delegate into an observable via the FromEvent method, which gives us our return value.

NOTE:  I don't recommend this approach, as mentioned previously.  For one thing, it doesn't support OnError or OnCompleted.  It doesn't support cancellation either, but given that we're generating a hot observable perhaps it doesn't really matter.  It's also a bit strange anyway.

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace Rx.Labs
{
    static class ToUseSubjectOrNotToUseSubject
    {
        public static void Main()
        {
            IObservable<string> xs = Generate();

            using (xs.Take(1).Subscribe(Console.WriteLine))
            using (xs.Take(2).Subscribe(Console.WriteLine))
            using (xs.Take(3).Subscribe(Console.WriteLine))
            using (xs.Take(4).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
        }

        static IObservable<string> Generate()
        {
            Action<string> observer = _ => { };    /* We could use null, but then at every invocation 
                                                    * we'd have to copy to a local and check for null.
                                                    */

            Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Invocation side-effect.");

                    Task.Delay(TimeSpan.FromSeconds(1)).Wait();  // Simulate work

                    observer("Generating");
                    observer("a");
                    observer("hot");
                    observer("observable.");
                });

            return Observable.FromEvent<string>(
                eh => observer += eh,
                eh => observer -= eh);
        }
    }
}

Output:

Invocation side-effect.
Generating
Generating
Generating
Generating
a
a
a
hot
hot
observable.

Answer 2 (Recommended):  Define an observable using an existing Rx generator method, then apply Publish with RefCount.

Rx provides various generators that make it easy to create an observable from thin air; e.g., Return, Range, Timer, Interval, Generate, and also Create.  If none of the former generators apply, then fallback to CreateCreate is the Swiss Army knife of observable generation.  I suspect that it also has similar performance characteristics to Subject<T>.  (See the Thread Safety section below for details.)

A particularly useful approach is to write an async iterator using a special overload of Create:

Rx 2.0 (Preferred):    Create<TResult>(Func<IObserver<TResult>, Task>)
Rx 1.1 Experimental:   Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>>)

Generators are better than subjects because they allow you to remain functional, though Create does seem quite similar to using a subject.  Create lets you take more of a declarative approach, which is hopefully easier to understand and maintain than code that uses a subject.  Note that a primary benefit of Create is that it supports cancelation and auto-detachment, though it's not very useful when you need a hot observable.

Generators aren't the whole story either.  They return a cold observable, but our specification was to return a hot observable.

That's where Publish is useful, as described earlier in this post.  Publish converts a cold observable into a hot observable; however, it returns IConnectableObservable<T>, which requires a call to Connect.  You can return IConnectableObservable<T> from your method, just make sure that you change the return type of your method to match so that callers have the opportunity to call Connect themselves.

Alternatively, you can call Publish then RefCount to get an IObservable<T> that is shared among multiple consecutive observers.  Note that this isn't truly a hot observable - it's more like a warm observable.  RefCount makes a single subscription to the underlying observable while there's at least one observer of your query.  When your query has no more observers, changing the reference count to 0, the underlying subscription is disposed.  If another observer subscribes to your query later, moving the reference count from 0 to 1 again, then RefCount makes a new subscription to the underlying observable, causing subscription side-effects to occur again.

Note that it's bad practice to call Connect yourself in this scenario.  If you're tempted to do so, perhaps because RefCount doesn't meet your needs, then you should strongly consider returning a cold observable instead and leave it up to callers to decide whether Publish should be used at all.

Also beware to avoid closures when using Create.3  Closing over a local variable that you intend to mutate may cause unexpected behavior since every observer will share the same variable.  This may seem fine when generating a hot observable, and perhaps it is, though RefCount generates a warm observable, as mentioned previously.

Here's a correct example that uses Create, Publish and RefCount instead of a subject

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace Rx.Labs
{
    static class ToUseSubjectOrNotToUseSubject
    {
        public static void Main()
        {
            IObservable<string> xs = Generate();

            using (xs.Take(1).Subscribe(Console.WriteLine))
            using (xs.Take(2).Subscribe(Console.WriteLine))
            using (xs.Take(3).Subscribe(Console.WriteLine))
            using (xs.Take(4).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
        }

        static IObservable<string> Generate()
        {
            return Observable.Create<string>(observer =>
                TaskPoolScheduler.Default.Schedule(() =>
                {
                    Console.WriteLine("Subscription side-effect.");

                    Task.Delay(TimeSpan.FromSeconds(1)).Wait();  // Simulate work

                    observer.OnNext("Generating");
                    observer.OnNext("a");
                    observer.OnNext("hot");
                    observer.OnNext("observable.");

                    observer.OnCompleted();
                }))
                .Publish()
                .RefCount();
        }
    }
}

Output:

Subscription side-effect.
Generating
Generating
Generating
Generating
a
a
a
hot
hot
observable.

Thread Safety

Subject<T>, Observable.Create and ObserverBase<T> do not make the same guarantees that are made by Rx generator methods, for performance reasons.  They allow callers to violate some of Rx's contracts for IObserver<T>, though they do ensure some contracts automatically regardless.

The contracts to which I'm referring are defined in the Rx Design Guidelines document.

Section 4.2 basically states that notifications must be pushed serially for Rx operators to behave properly.  But more generally, this should be a safe assumption everywhere.  When you subscribe to an observable, you don't expect to observe concurrent notifications.  Imagine coding subscriptions against this assumption but actually receiving concurrent notifications at runtime.  Take a look at some of your existing code.  What could happen?  For example, if you subscribe on a UI thread then you may get a threading context exception.  If you subscribe on some other thread instead, then maybe you'll get some strange threading bugs or maybe you'll get an exception, if you're lucky.

All Rx generator methods, excluding Create, ensure that this contract is satisfied.  Subject<T> and ObserverBase<T> do not.  For instance, calling OnNext concurrently on a Subject<T> will generate concurrent notifications.4

Section 4.1 states the Rx grammar as follows:

OnNext* (OnCompleted | OnError)?

All Rx operators and generator methods ensure that this contract is satisfied.  ObservableBase<T>, ObserverBase<T>, Create and Subject<T> actually ensure this contract as well, but only for serial notifications.  If a caller doesn't satisfy the §4.2 contract, then it seems that these APIs do not automatically ensure the §4.1 contract is satisfied either, due to lock-free race conditions.  For instance, it seems to me that calling a subject's OnNext and OnError methods simultaneously on different threads may cause an observer to receive OnError followed by OnNext.  But if you call OnError followed by OnNext on the same thread, then the call to OnNext is correctly ignored.

Rx offers Synchronize to satisfy these contracts for a given subject.  It offers this Synchronize method to do the same for observers, in general.  And of course, just because you can violate the contracts doesn't mean that you necessarily will.  But you do have to be aware of them when using these APIs, especially considering the kinds of bugs that may arise.  Threading bugs originating from within Rx operators, due to the misuse of a subject somewhere within a query, can be very difficult to diagnose.

So when you find yourself invoking OnNext, OnError or OnCompleted on a subject or within Create, make sure that you're satisfying Rx's contracts.

The takeaway is that when a subject isn't necessary due to the presence of an external source or an appropriate generator method, it's best not to use a subject in the first place so you can avoid having to invoke it imperatively and risk introducing subtle threading bugs in your queries.

Correct Examples

As shown in this post, there's only 1 particular set of conditions in which it's absolutely appropriate to use subjects.  Here are some examples.

Reactive Property

A common example of when it's correct to use a subject is to represent a sequence of property changes for a particular property, exposing it with AsObservable to hide its identity.  It's similar to implementing the PropertyChanged event, but only raising it for a particular property.  Instead of raising an event from within the property's setter, you'd call subject.OnNext(value).

Subject<T> works fine, though more commonly BehaviorSubject<T> is used instead because it stores the latest value of the property and pushes it immediately to new observers.  If your program is highly reactive, then you may find that you don't even need to keep a backing field for the property since BehaviorSubject<T> encapsulates it.  To set the value, call subject.OnNext(value).  Unfortunately, there's no way to extract the value imperatively (at least not until Rx exposes a Current property), so you may find that a backing field is still necessary unless your program is highly reactive.  Subscribing to the subject is the natural way of extracting the current value and all subsequent changes.

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rx.Labs
{
    class SubjectAsProperty
    {
        public static void Main()
        {
            var foo = new Foo();

            Console.WriteLine("Normal property usage:");

            Console.WriteLine(foo.Number);
            foo.Number += 8;
            Console.WriteLine(foo.Number);

            Console.WriteLine("Reactive property usage:");

            using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer A: {0}", n)))
            {
                foo.Number = 1;
                foo.Number = 2;

                using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer B: {0}", n)))
                {
                    foo.Number = 3;
                }

                using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer C: {0}", n)))
                {
                    foo.Number = 4;
                }
            }

            Console.WriteLine(foo.Number);
            Console.ReadKey();
        }
    }

    class Foo
    {
        public int Number
        {
            get
            {
                return number.FirstAsync().Wait();
            }
            set
            {
                number.OnNext(value);
            }
        }

        public IObservable<int> NumberObservable
        {
            get
            {
                return number.AsObservable();
            }
        }

        private readonly BehaviorSubject<int> number = new BehaviorSubject<int>(42);
    }
}

Output:

Normal property usage:
42
50
Reactive property usage:
Observer A: 50
Observer A: 1
Observer A: 2
Observer B: 2
Observer A: 3
Observer B: 3
Observer C: 3
Observer A: 4
Observer C: 4
4

Cross-Cutting APIs

It's common to use a subject to represent cross-cutting concerns in a reactive manner.  For example, imagine exposing a property on a purely memory-bound logging class, returning an observable that pushes a notification whenever something is logged, such as entering and exiting methods (possibly via AOP).  This could be useful for real-time performance analysis, reactive heuristic-based adjustments, post-mortem debugging and even real-time debugging.  You could have one observer writing notifications to disk and one writing them to a GUI console, while another adjusts algorithms at runtime based on performance heuristics.  ReplaySubject<T> could be used to buffer notifications in memory and replay them to new observers, if necessary.

Another example would be a sequence of alerts exposed as an observable property on an Alert class.  Various parts of the program could send alerts concurrently by calling Alert.Unimportant(...) or Alert.RequiresUserInteraction(...), for example.  One observer could send emails while another displays toast notifications.

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rx.Labs
{
    class AlertLab
    {
        public static void Main()
        {
            var toast = from alert in Alert.All.OfType<ToastAlert>()
                        select alert.Message;

            var email = Alert.All.OfType<EmailAlert>();

            using (toast.Subscribe(a => Console.WriteLine("Important: {0}", a)))
            using (email.Subscribe(a => Console.WriteLine("Email \"{0}\" with \"{1}\".", a.To, a.Message)))
            {
                new ToastAlert("Hello Console!").Send();
                new EmailAlert("Hello Fred!", "fred@example.com").Send();
                new ToastAlert("Goodbye!").Send();
            }
        }
    }

    public abstract class Alert
    {
        public static IObservable<Alert> All
        {
            get
            {
                return alerts.AsObservable();
            }
        }

        public string Message
        {
            get
            {
                return message;
            }
        }

        private static readonly ISubject<Alert, Alert> alerts = Subject.Synchronize(new Subject<Alert>());
        private readonly string message;

        protected Alert(string message)
        {
            this.message = message;
        }

        public void Send()
        {
            alerts.OnNext(this);
        }
    }

    public class ToastAlert : Alert
    {
        public ToastAlert(string message)
            : base(message)
        {
        }
    }

    public class EmailAlert : Alert
    {
        public string To { get; private set; }

        public EmailAlert(string message, string to)
            : base(message)
        {
            this.To = to;
        }
    }
}

Output:

Important: Hello Console!
Email "fred@example.com" with "Hello Fred!".
Important: Goodbye!

Conclusion

If you've got anything that can be converted into an observable, such as a Task or Task<T>, an event, or an enumerable, then convert it using a From* or To* operator such as these or this, or if you've already got an observable, then use it.  Now consider temperature conflicts, change the temperature if necessary via Defer or Publish, and finally query it.  Don't use subjects.

Else if you don't have an observable or anything that can be converted into one:

If you require a cold observable, then use an existing Rx generator method; e.g., Return, Range, Timer, Interval, Generate or Create.  Don't use subjects.

Else if you require a hot observable:

If the scope of your observable is a method (i.e., it's entirely encapsulated by a method), then use an existing Rx generator method.  You can also apply Publish and RefCount, but consider relaxing the conditions to simply return a cold observable.  Don't use subjects.

Else if the scope of your observable is a type (e.g., it's exposed as a public property and backed by a field):

If you need to define a similar event or a similar event already exists, then convert the event into an observable like the first case.  Don't use subjects.

Else if you don't need to define a similar event and no similar event already exists, then use a subject.

In summary:

When should I use a subject?

When all of the following are true:

  • you don't have an observable or anything that can be converted into one.
  • you require a hot observable.
  • the scope of your observable is a type.
  • you don't need to define a similar event and no similar event already exists.

Why should I use a subject in that case?

Because you've got no choice!

 


  1. At least that's my interpretation of Erik's comments.  I find it to be really good advice in practice when working with Rx.
  2. Perhaps in very advanced and rare scenarios you may find a custom ISubject<T> implementation to be useful, in which case it's safe to use Multicast rather than Publish.
  3. Which happens to be an easy-to-miss bug when implementing custom operators in Rx.  If you need to return a cold observable, as generally operators do, and you must use a closure, then wrap the closure and the call to Create in a call to Defer to ensure that the closure isn't shared across multiple subscriptions.
  4. In some advanced and rare scenarios you may want to elide this contract and permit concurrent notifications through a subject, for performance reasons, though you should do so only if you can guarantee that none of Rx's query operators will be applied to the subject and all subscribers are thread-safe.

Tags:

.NET | Patterns | Rx

Add comment