June 22, 2013

To Use Subject Or Not To Use Subject?

That is the question!  There appears to be some confusion on the web about whether or not Subject<T> should be used, ever.  Tis a question oft asked by developers new to Rx after reading expert recommendations to avoid subjects, due to common misuse, yet subjects seem to persist an air of mystery even for seasoned reactive developers.

First, a primer on Subjects.  Take a moment to read that.

Done?  Great.  Let's continue...

As you may already know, Erik Meijer doesn't like subjects.  End of discussion, right?  Wrong!  Erik doesn't like subjects to be used in place of purely functional alternatives.1

As Erik puts it:

[Subjects] are the "mutable variables" of the Rx world and in most cases you do not need them.

So let's enumerate the cases where we should and shouldn't use subjects.

We'll start by examining two interesting properties of subjects:

  1. They implement IObservable<T> and IObserver<T> simultaneously, which makes them suitable for both output and input, respectively.
  2. They are hot(See also: Hot and Cold Observables)

Thus to answer the question of whether we should use a subject when defining an observable (output) we must consider:

  1. the input; i.e., the source of the OnNext, OnCompleted and OnError notifications.
  2. the desired temperature of the generated observable; e.g., hot or cold.

To determine whether or not to use a subject, start by asking yourself the following questions:

  1. what is the source of the notifications?  I.e., where do the values of T come from?  Who is responsible for calling OnNext?
  2. must observers share subscription side-effects or not?  I.e., does calling Subscribe execute some kind of initialization code each time it's called or does initialization occur once and its state is shared by every call to Subscribe?

Let's answer all possible combinations of these 2 questions.  I'll provide more details and examples later in this post to explain why these answers make sense.

Question 1:  What is the source of the notifications?

The source can be one of two kinds: Let's call them external and local.  An external source is any observable or event that already exists outside of your code.  A local source is when you generate an observable from your code, kind of like defining and raising a custom .NET event.  More details on these distinctions later in this post.

  1. If the source is external, then do not use a subject.
  2. If the source is local, then do use a subject.

Question 2:  Must observers share subscription side-effects or not?

  1. If not, then do not use a subject.  (Define a cold observable.)
  2. If yes, then do use a subject.  (Define a hot observable.)

Let's enumerate all possible combinations: 

  1. The source is external and I want a cold observable.
  2. The source is external and I want a hot observable.
  3. The source is local and I want a cold observable.
  4. The source is local and I want a hot observable. 

As you may have noticed, there are conflicts.

  • What if you have an external source and you do want to share subscription side-effects?
  • What if you have a local source and you do not want to share subscription side-effects?
  • What if the temperature of an external source differs from the desired temperature?

I believe these conflicts cause additional confusion among developers.  They seem to blur the line when it's appropriate or inappropriate to use subjects, though in fact the line is actually quite clear, as we'll soon see.

Let's resolve these conflicts now.

The source is external and I want a cold observable.

  1. If the source is cold, then use it as is.  You can also wrap it with Defer to add subscription side-effects.
  2. If the source is hot, then wrap it with Defer to add subscription side-effects.

The source is external and I want a hot observable.

  1. If the source is cold, then Publish it.  Alternatively, you can use PublishLast or Replay, if you need their additional behavior.
  2. If the source is hot, then use it as is.

The source is local and I want a cold observable.

  1. Define your observable with existing Rx generator methods; e.g., Return, Range, Timer, Interval, Generate or Create.

The source is local and I want a hot observable.

  1. This scenario is a potential candidate for defining your observable as Subject<T>, or even BehaviorSubject, AsyncSubject or ReplaySubject, if you need their behavior.  More details on why it's merely a "potential candidate" later in this post.

Note that Publish, PublishLast and Replay use subjects internally (via Multicast).  This isn't a coincidence.  Subscribing a subject to a cold observable broadcasts its notifications to multiple observers, thus making it hot.

So it's clear there are only two scenarios where it's correct to use subjects:

  1. The source is external and cold, and I want a hot observable.
  2. The source is local and I want a hot observable.

Let's simplify those statements.  The two scenarios where it's correct to use subjects are:

  1. Converting a cold observable into a hot observable.
  2. Generating a hot observable imperatively and statefully, without any direct external source.

Although, only in the latter scenario is it correct to use subjects explicitly.  As mentioned previously, Rx defines various operators like Publish for use in the former scenario.  Thus, only 1 scenario remains.

And so now we've answered our original questions:

When should I use a subject?  To generate a hot observable imperatively and statefully, without any direct external source.
When shouldn't I use a subject?  Everywhere else.

Perhaps that answer is a bit vague.  What do I mean by imperative and stateful?  What about local vs. external?  And why did I mention before that our final scenario is only a "potential candidate" for using a subject?

If (true == interested_in_the_answers_to_these_questions) { continue; } else { goto Conclusion; }

External Sources

Let's redefine external as quite simply a source of notifications that you do not have to generate yourself.  It comes from outside the scope of the observable that you're defining, such as a dependent object or a parameter.  It could be any number of things; e.g., an event on another object, an event on a base class, a timer callback, a Task<T> that represents the asynchronous result of reading from a file or sending a web request, an IEnumerable<T>, etc.

All of these examples have something in common: they can easily be converted into IObservable<T> using various From* and To* operators in Rx.  For example, Task<T> is converted by the ToObservable operator.  So if your external source is not already an IObservable<T>, then simply covert it using one of Rx's existing conversion operators.

Once you have an IObservable<T>, you need to determine whether it's hot or cold and whether you need it to be hot or cold.  If there's a conflict, as defined above, then use either Defer or Publish to change the temperature accordingly.  For this, there's no need to use subjects in your code.2

Finally, use LINQ to query the observable.  Again, you shouldn't use subjects here either.

The result is that you'll have a succinct observable query, controlling side-effects and meeting your specification in a declarative manner.  Subjects are simply the wrong tool for this job.  It's like hammering in a nail with a sledgehammer.  Subjects do much more than what is needed.  (Though as you'll see later in this post, in some ways subjects actually do less than what you may think, potentially introducing nasty threading bugs.)

Local / Imperative / Stateful Sources

I'm defining local as a source where you imperatively push notifications toward the outside of a stateful scope.  Another way of looking at it: a local source is external to other scopes that can't access its implementation.

Sorry if that explanation is confusing.  Let's clear it up.  Consider the following abstract requirements:

  1. You must define a hot observable.
  2. The code that generates the observable's notifications must be encapsulated by some type that you define.
  3. Your code generates notifications when callers invoke certain members of your type or when it performs certain operations.
  4. The observable must be exposed as a public property.

Now let's apply these requirements to what we've already learned earlier: 

  1. Since we're defining a hot observable, Subject<T> is certainly a candidate...
  2. We've established a scope for what is external: any source defined by another type and referenced by ours is external to our type.
  3. We must write code to push notifications (input); therefore, our source is local.  We have no external source to convert or to generate notifications for us.
  4. Our code must be able to reference something in order to push notifications, thus our type must be stateful.  We'll store a reference to "something" in a field.
  5. The field will be returned by a public property (output).
  6. Thus we need a type that represents both input and output, and is hot.

How should we model these requirements?

Scroll down for the answer:
.
.
.
.
.
.
.
.

Answer: Define a .NET event and expose it as an observable public property using FromEventPattern!

Ha, I bet you didn't see that one coming, did you?

Yes, alternatively we could use Subject<T>.  The conditions are perfect.  If we don't actually need or want a .NET event, then we should use a subject; however, if we already have an event defined, then there's no need to duplicate state and "raise" two similar events.  The conversion approach is probably better in that case.

Let's compare the similarity of subjects to .NET events in general:

  1. Events are also hot.
  2. An event's delegate shares the same scope (encapsulation) within our type as our proposed observable.
  3. We also write code to raise events (input).
  4. Our code must be able to reference the delegate in order to raise events, thus our type must be stateful.  We store a reference to the delegate in a field.
  5. The field will be returned by a public event (output).
  6. A multicast delegate represents both input and output, and is hot.

Subject<T> is kind of like Rx's implementation of an observable "event".  It broadcasts notifications to multiple observers, like raising an event for multiple event handlers.  It is the stateful component of Rx as it can be stored in a field or a variable and mutated (invoked) imperatively.  It is hot, so subscribing does not cause any side-effects (and subscribers may have already missed notifications, which is a common symptom of events and hot observables alike.)

It's correct to use Subject<T> to replace local .NET events, though .NET events are still useful sometimes due to their metadata and first-class language/tooling support.  It all depends on context, so consider this carefully before replacing all of your .NET events with subjects.  You may want to examine each event on a case-by-case basis.  And keep in mind that you can always expose an event as an observable via FromEventPattern, or perhaps it's best to leave that decision up to the consumers of your class and simply expose a .NET event.

Subjects actually have an additional related use case that events cannot cover: If we shrink our scope from type to method and apply the same logic as before, then we'll discover that subjects may still apply though events do not.

Let's specify similar method-scoped requirements:

  1. You must define a hot observable.
  2. The code that generates the observable's notifications must be within a method that you define.
  3. Your code generates notifications when it performs certain asynchronous operations within the method.
  4. The observable must be exposed as the return value of the method.

And again, let's apply these requirements to what we've already learned earlier: 

  1. Since we're defining a hot observable, Subject<T> is certainly a candidate...
  2. We've established a scope for what is external: any source passed into our method and any fields on the method's defining type are external to our method.
  3. We must write code to push notifications (input); therefore, our source is local.  We have no external source to convert or to generate notifications for us.
  4. Our code must be able to reference something in order to push notifications, thus our method must be stateful.  We'll store a reference to "something" in a local variable.
  5. The variable will be returned from the method (output).
  6. Thus we need a type that represents both input and output, and is hot.

How should we model these requirements?

Scroll down for the answer:
.
.
.
.
.
.
.
.

Answer 1: Write an anonymous event (a.k.a., anonymous method, delegate or lambda expression) and convert it using FromEvent!

Ha, did I trick you again?

You thought for sure this time I'd recommend using Subject<T>.  You'd also be correct, we could use Subject<T>.  The conditions may be perfect, though as you'll see in a moment there are better alternatives, even better than this one.

Alright, that answer was actually a joke, I don't recommend that particular approach at all.  It was meant to get you thinking about alternatives to subjects in places where you may think they're absolutely necessary.

But out of curiosity, how would that have worked anyway?

An event is backed by a delegate; it's the delegate that has all of those important properties (e.g., input, output, hot) that we discussed earlier.  For example, we could start by declaring a delegate as our observer.  By assigning the delegate to a local variable we can invoke it imperatively like a method, which is similar to calling OnNext on an observer.  We can use Task.Factory.StartNew to execute an async computation that invokes the delegate.  Finally, we can convert our delegate into an observable via the FromEvent method, which gives us our return value.

NOTE:  I don't recommend this approach, as mentioned previously.  For one thing, it doesn't support OnError or OnCompleted.  It doesn't support cancellation either, but given that we're generating a hot observable perhaps it doesn't really matter.  It's also a bit strange anyway.

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace Rx.Labs
{
    static class ToUseSubjectOrNotToUseSubject
    {
        public static void Main()
        {
            IObservable<string> xs = Generate();

            using (xs.Take(1).Subscribe(Console.WriteLine))
            using (xs.Take(2).Subscribe(Console.WriteLine))
            using (xs.Take(3).Subscribe(Console.WriteLine))
            using (xs.Take(4).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
        }

        static IObservable<string> Generate()
        {
            Action<string> observer = _ => { };    /* We could use null, but then at every invocation 
                                                    * we'd have to copy to a local and check for null.
                                                    */

            Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Invocation side-effect.");

                    Task.Delay(TimeSpan.FromSeconds(1)).Wait();  // Simulate work

                    observer("Generating");
                    observer("a");
                    observer("hot");
                    observer("observable.");
                });

            return Observable.FromEvent<string>(
                eh => observer += eh,
                eh => observer -= eh);
        }
    }
}

Output:

Invocation side-effect.
Generating
Generating
Generating
Generating
a
a
a
hot
hot
observable.

Answer 2 (Recommended):  Define an observable using an existing Rx generator method, then apply Publish with RefCount.

Rx provides various generators that make it easy to create an observable from thin air; e.g., Return, Range, Timer, Interval, Generate, and also Create.  If none of the former generators apply, then fallback to CreateCreate is the Swiss Army knife of observable generation.  I suspect that it also has similar performance characteristics to Subject<T>.  (See the Thread Safety section below for details.)

A particularly useful approach is to write an async iterator using a special overload of Create:

Rx 2.0 (Preferred):    Create<TResult>(Func<IObserver<TResult>, Task>)
Rx 1.1 Experimental:   Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>>)

Generators are better than subjects because they allow you to remain functional, though Create does seem quite similar to using a subject.  Create lets you take more of a declarative approach, which is hopefully easier to understand and maintain than code that uses a subject.  Note that a primary benefit of Create is that it supports cancelation and auto-detachment, though it's not very useful when you need a hot observable.

Generators aren't the whole story either.  They return a cold observable, but our specification was to return a hot observable.

That's where Publish is useful, as described earlier in this post.  Publish converts a cold observable into a hot observable; however, it returns IConnectableObservable<T>, which requires a call to Connect.  You can return IConnectableObservable<T> from your method, just make sure that you change the return type of your method to match so that callers have the opportunity to call Connect themselves.

Alternatively, you can call Publish then RefCount to get an IObservable<T> that is shared among multiple consecutive observers.  Note that this isn't truly a hot observable - it's more like a warm observable.  RefCount makes a single subscription to the underlying observable while there's at least one observer of your query.  When your query has no more observers, changing the reference count to 0, the underlying subscription is disposed.  If another observer subscribes to your query later, moving the reference count from 0 to 1 again, then RefCount makes a new subscription to the underlying observable, causing subscription side-effects to occur again.

Note that it's bad practice to call Connect yourself in this scenario.  If you're tempted to do so, perhaps because RefCount doesn't meet your needs, then you should strongly consider returning a cold observable instead and leave it up to callers to decide whether Publish should be used at all.

Also beware to avoid closures when using Create.3  Closing over a local variable that you intend to mutate may cause unexpected behavior since every observer will share the same variable.  This may seem fine when generating a hot observable, and perhaps it is, though RefCount generates a warm observable, as mentioned previously.

Here's a correct example that uses Create, Publish and RefCount instead of a subject

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace Rx.Labs
{
    static class ToUseSubjectOrNotToUseSubject
    {
        public static void Main()
        {
            IObservable<string> xs = Generate();

            using (xs.Take(1).Subscribe(Console.WriteLine))
            using (xs.Take(2).Subscribe(Console.WriteLine))
            using (xs.Take(3).Subscribe(Console.WriteLine))
            using (xs.Take(4).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
        }

        static IObservable<string> Generate()
        {
            return Observable.Create<string>(observer =>
                TaskPoolScheduler.Default.Schedule(() =>
                {
                    Console.WriteLine("Subscription side-effect.");

                    Task.Delay(TimeSpan.FromSeconds(1)).Wait();  // Simulate work

                    observer.OnNext("Generating");
                    observer.OnNext("a");
                    observer.OnNext("hot");
                    observer.OnNext("observable.");

                    observer.OnCompleted();
                }))
                .Publish()
                .RefCount();
        }
    }
}

Output:

Subscription side-effect.
Generating
Generating
Generating
Generating
a
a
a
hot
hot
observable.

Thread Safety

Subject<T>, Observable.Create and ObserverBase<T> do not make the same guarantees that are made by Rx generator methods, for performance reasons.  They allow callers to violate some of Rx's contracts for IObserver<T>, though they do ensure some contracts automatically regardless.

The contracts to which I'm referring are defined in the Rx Design Guidelines document.

Section 4.2 basically states that notifications must be pushed serially for Rx operators to behave properly.  But more generally, this should be a safe assumption everywhere.  When you subscribe to an observable, you don't expect to observe concurrent notifications.  Imagine coding subscriptions against this assumption but actually receiving concurrent notifications at runtime.  Take a look at some of your existing code.  What could happen?  For example, if you subscribe on a UI thread then you may get a threading context exception.  If you subscribe on some other thread instead, then maybe you'll get some strange threading bugs or maybe you'll get an exception, if you're lucky.

All Rx generator methods, excluding Create, ensure that this contract is satisfied.  Subject<T> and ObserverBase<T> do not.  For instance, calling OnNext concurrently on a Subject<T> will generate concurrent notifications.4

Section 4.1 states the Rx grammar as follows:

OnNext* (OnCompleted | OnError)?

All Rx operators and generator methods ensure that this contract is satisfied.  ObservableBase<T>, ObserverBase<T>, Create and Subject<T> actually ensure this contract as well, but only for serial notifications.  If a caller doesn't satisfy the §4.2 contract, then it seems that these APIs do not automatically ensure the §4.1 contract is satisfied either, due to lock-free race conditions.  For instance, it seems to me that calling a subject's OnNext and OnError methods simultaneously on different threads may cause an observer to receive OnError followed by OnNext.  But if you call OnError followed by OnNext on the same thread, then the call to OnNext is correctly ignored.

Rx offers Synchronize to satisfy these contracts for a given subject.  It offers this Synchronize method to do the same for observers, in general.  And of course, just because you can violate the contracts doesn't mean that you necessarily will.  But you do have to be aware of them when using these APIs, especially considering the kinds of bugs that may arise.  Threading bugs originating from within Rx operators, due to the misuse of a subject somewhere within a query, can be very difficult to diagnose.

So when you find yourself invoking OnNext, OnError or OnCompleted on a subject or within Create, make sure that you're satisfying Rx's contracts.

The takeaway is that when a subject isn't necessary due to the presence of an external source or an appropriate generator method, it's best not to use a subject in the first place so you can avoid having to invoke it imperatively and risk introducing subtle threading bugs in your queries.

Correct Examples

As shown in this post, there's only 1 particular set of conditions in which it's absolutely appropriate to use subjects.  Here are some examples.

Reactive Property

A common example of when it's correct to use a subject is to represent a sequence of property changes for a particular property, exposing it with AsObservable to hide its identity.  It's similar to implementing the PropertyChanged event, but only raising it for a particular property.  Instead of raising an event from within the property's setter, you'd call subject.OnNext(value).

Subject<T> works fine, though more commonly BehaviorSubject<T> is used instead because it stores the latest value of the property and pushes it immediately to new observers.  If your program is highly reactive, then you may find that you don't even need to keep a backing field for the property since BehaviorSubject<T> encapsulates it.  To set the value, call subject.OnNext(value).  Unfortunately, there's no way to extract the value imperatively (at least not until Rx exposes a Current property), so you may find that a backing field is still necessary unless your program is highly reactive.  Subscribing to the subject is the natural way of extracting the current value and all subsequent changes.

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rx.Labs
{
    class SubjectAsProperty
    {
        public static void Main()
        {
            var foo = new Foo();

            Console.WriteLine("Normal property usage:");

            Console.WriteLine(foo.Number);
            foo.Number += 8;
            Console.WriteLine(foo.Number);

            Console.WriteLine("Reactive property usage:");

            using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer A: {0}", n)))
            {
                foo.Number = 1;
                foo.Number = 2;

                using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer B: {0}", n)))
                {
                    foo.Number = 3;
                }

                using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer C: {0}", n)))
                {
                    foo.Number = 4;
                }
            }

            Console.WriteLine(foo.Number);
            Console.ReadKey();
        }
    }

    class Foo
    {
        public int Number
        {
            get
            {
                return number.FirstAsync().Wait();
            }
            set
            {
                number.OnNext(value);
            }
        }

        public IObservable<int> NumberObservable
        {
            get
            {
                return number.AsObservable();
            }
        }

        private readonly BehaviorSubject<int> number = new BehaviorSubject<int>(42);
    }
}

Output:

Normal property usage:
42
50
Reactive property usage:
Observer A: 50
Observer A: 1
Observer A: 2
Observer B: 2
Observer A: 3
Observer B: 3
Observer C: 3
Observer A: 4
Observer C: 4
4

Cross-Cutting APIs

It's common to use a subject to represent cross-cutting concerns in a reactive manner.  For example, imagine exposing a property on a purely memory-bound logging class, returning an observable that pushes a notification whenever something is logged, such as entering and exiting methods (possibly via AOP).  This could be useful for real-time performance analysis, reactive heuristic-based adjustments, post-mortem debugging and even real-time debugging.  You could have one observer writing notifications to disk and one writing them to a GUI console, while another adjusts algorithms at runtime based on performance heuristics.  ReplaySubject<T> could be used to buffer notifications in memory and replay them to new observers, if necessary.

Another example would be a sequence of alerts exposed as an observable property on an Alert class.  Various parts of the program could send alerts concurrently by calling Alert.Unimportant(...) or Alert.RequiresUserInteraction(...), for example.  One observer could send emails while another displays toast notifications.

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rx.Labs
{
    class AlertLab
    {
        public static void Main()
        {
            var toast = from alert in Alert.All.OfType<ToastAlert>()
                        select alert.Message;

            var email = Alert.All.OfType<EmailAlert>();

            using (toast.Subscribe(a => Console.WriteLine("Important: {0}", a)))
            using (email.Subscribe(a => Console.WriteLine("Email \"{0}\" with \"{1}\".", a.To, a.Message)))
            {
                new ToastAlert("Hello Console!").Send();
                new EmailAlert("Hello Fred!", "fred@example.com").Send();
                new ToastAlert("Goodbye!").Send();
            }
        }
    }

    public abstract class Alert
    {
        public static IObservable<Alert> All
        {
            get
            {
                return alerts.AsObservable();
            }
        }

        public string Message
        {
            get
            {
                return message;
            }
        }

        private static readonly ISubject<Alert, Alert> alerts = Subject.Synchronize(new Subject<Alert>());
        private readonly string message;

        protected Alert(string message)
        {
            this.message = message;
        }

        public void Send()
        {
            alerts.OnNext(this);
        }
    }

    public class ToastAlert : Alert
    {
        public ToastAlert(string message)
            : base(message)
        {
        }
    }

    public class EmailAlert : Alert
    {
        public string To { get; private set; }

        public EmailAlert(string message, string to)
            : base(message)
        {
            this.To = to;
        }
    }
}

Output:

Important: Hello Console!
Email "fred@example.com" with "Hello Fred!".
Important: Goodbye!

Conclusion

If you've got anything that can be converted into an observable, such as a Task or Task<T>, an event, or an enumerable, then convert it using a From* or To* operator such as these or this, or if you've already got an observable, then use it.  Now consider temperature conflicts, change the temperature if necessary via Defer or Publish, and finally query it.  Don't use subjects.

Else if you don't have an observable or anything that can be converted into one:

If you require a cold observable, then use an existing Rx generator method; e.g., Return, Range, Timer, Interval, Generate or Create.  Don't use subjects.

Else if you require a hot observable:

If the scope of your observable is a method (i.e., it's entirely encapsulated by a method), then use an existing Rx generator method.  You can also apply Publish and RefCount, but consider relaxing the conditions to simply return a cold observable.  Don't use subjects.

Else if the scope of your observable is a type (e.g., it's exposed as a public property and backed by a field):

If you need to define a similar event or a similar event already exists, then convert the event into an observable like the first case.  Don't use subjects.

Else if you don't need to define a similar event and no similar event already exists, then use a subject.

In summary:

When should I use a subject?

When all of the following are true:

  • you don't have an observable or anything that can be converted into one.
  • you require a hot observable.
  • the scope of your observable is a type.
  • you don't need to define a similar event and no similar event already exists.

Why should I use a subject in that case?

Because you've got no choice!

 


  1. At least that's my interpretation of Erik's comments.  I find it to be really good advice in practice when working with Rx.
  2. Perhaps in very advanced and rare scenarios you may find a custom ISubject<T> implementation to be useful, in which case it's safe to use Multicast rather than Publish.
  3. Which happens to be an easy-to-miss bug when implementing custom operators in Rx.  If you need to return a cold observable, as generally operators do, and you must use a closure, then wrap the closure and the call to Create in a call to Defer to ensure that the closure isn't shared across multiple subscriptions.
  4. In some advanced and rare scenarios you may want to elide this contract and permit concurrent notifications through a subject, for performance reasons, though you should do so only if you can guarantee that none of Rx's query operators will be applied to the subject and all subscribers are thread-safe.

Tags:

.NET | Patterns | Rx

June 04, 2013

Bye Google Reader, Hello Windows RSS Platform

With the impending demise of iGoogle and Google Reader, which have been set to my web browser's home page for several years now, I had recently come to the realization that I was going to open IE one day and see an error message where iGoogle used to be.  All of my subscribed RSS feeds would be gone.

I did some research to see what kinds of replacements were out there.  For me, RSS feeds provide topics of particular interest that may be worthy of exploring on the web, and my web browser's home page is the gateway to the web; therefore, I wasn't interested in using feed-reader software that runs outside of my browser.  Feeds contain hyperlinks that lead me into the browser, so I might as well read my feeds right on the home page.

It seemed appropriate to begin my research by revisiting the Feeds tab in IE 10.  I had thought perhaps there have been improvements that could make it a viable replacement for Google Reader.  It didn't take long before I realized why I had chosen Google Reader several years ago and never switched to IE's built-in RSS feed support.  Alas, since I have no choice now but to change, I decided to research deeper into this feature of IE and see whether it may be worth using.  There were a few things I really liked about it:

  1. IE automatically detects feeds on web pages and lets you subscribe to them easily via a button on its Command Bar.
  2. IE opens feeds in a special reader allowing you to easily search, sort and filter.  It also provides a link to quickly subscribe or unsubscribe.
  3. IE lets you easily organize your feeds into folders.

I'm sure you'd agree that the integrated experience is a huge plus; however, IE also has a few negatives:

  1. IE doesn't provide a "What's New?" kind of view; i.e., an aggregated view containing all of the latest unread items among all feeds.
  2. IE doesn't provide a way to quickly view a list of items in a selected feed without showing their contents.
  3. IE doesn't provide a way to mark a single item as read/unread.

So it seems that IE has a great story for subscribing to and organizing feeds, though it really lacks in support for interacting with feeds.  The latter is extremely important to me, which is why I had originally chosen iGoogle as my home page for its Google Reader widget.

But the story doesn't end there.  Digging deeper I discovered that IE builds its RSS features on top of a public Windows API called the Windows RSS Platform.   IE merely provides a GUI containing only a subset of the functionality offered by this platform.  That was great news!  I decided to write my own web page against this API.  I could set it as my home page and interact with my RSS feeds easily, similar to the Google Reader experience, while taking advantage of the integration and organizational features built-in to IE.

These were my primary goals:

  1. Implement a stand-alone HTML page.  No server, no external scripts, no external style sheets, no external images.
  2. Target HTML5 in IE 10 (desktop mode only) for Windows 8.
  3. Touch input support.
  4. Use JavaScript to interact with the Windows RSS Platform APIs via Window's built-in ActiveX automation.
  5. Display 3 synchronized panels:
    • Tree view shows all feeds organized into folders, similar to the Feeds tab in IE.
    • Aggregate view containing all of the latest unread items; i.e., "What's New?"
    • Content viewer for the selected item.

I'm happy to report that my project was a success.  I worked on it all last weekend.  I even added several usability features that will hopefully make it appealing to others.  It relies heavily on jQuery, which is minified and embedded into the file.

To the right, you'll see a screenshot of the page in touch-screen mode.

Editing and organizational features weren't required thankfully due to IE's existing integration; e.g., the feed button on the Command Bar and the Feeds tab in the so-called Favorites Center provide all that is needed to subscribe and unsubscribe feeds, organize feeds into folders and configure feed update intervals and cache settings.  Once you've subscribed to a feed in IE, my page will automatically show it and aggregate the latest items into the center view.  To update the page, simply refresh it by pressing F5.

Download the finished product (as is, without warranty, use at your own risk, etc. etc.) from the following link.  It's a single minified HTML file named "rss.htm", approx. 108 KB in size.  Feel free to link to this blog post, but please do not link to the file directly and do not host the file publicly.

(EDIT: This file is now part of an open source project named RubyFeed.  I've changed the URL below to point to the v1.0 release of RubyFeed, which corresponds to the exact version of the file that was previously linked here.  However, I recommend downloading the latest release instead.)

https://rubyfeed.codeplex.com/releases/view/129236

(MD5 = e7d93aab06276614b917ea68369c1ac9)

Please let me know if you find it useful.  If you have any ideas for new features or would like to report bugs then let me know in the comments of this post.  I'm also considering publishing the project as open source if anyone's interested in contributing to it.

Installation

No installation is required.  If you're running Windows 8 and IE 10 then all of the components that you need are already installed on your computer.

The simplest way to get started is to copy the HTML file anywhere onto your computer and then double-click it (assuming that IE is configured as your default web browser).  You can also set it as your home page using the file:/// protocol.  For example, if you copy the file to your C: drive then you could set your home page to file:///c:\rss.htm.

WARNING: Since the page uses ActiveX you may have to deal with some security restrictions in IE, which may cause annoying prompts or, in the worst case scenario, IE will entirely block the page from loading.  Unfortunately, I haven't found a way to avoid the prompts when loading the file directly from the local file system with the file:/// protocol.

Alternatively, the best solution for me was to host the page locally and relax some of the security requirements in the Local Intranet security zone.  I was able to set my homepage to http://localhost/ and I'm no longer prompted with any security warnings.  Since I'm already running IIS locally, this was a piece of cake - it was as simple as copying the file to my root web directory and renaming it to default.htm.  Hunting down the appropriate security settings in IE was a bit more complicated, so I embedded the instructions directly into the file itself.  Open the page and when prompted with an ActiveX security warning simply choose to disallow the control to load; after a second or two the page will provide step-by-step instructions on how to disable the prompts.  Note that you must not disable scripts from running; otherwise, the instructions won't be shown.

Configuration

The page references a favicon.ico file in the same directory, though it's not required to actually exist.  If it does exist, then IE will display the icon when you browse to the page.  Feel free to create your own icon if you want.  Name it favicon.ico and copy it to the same directory as the HTML page.

The page also contains several configurable settings.  Open the file in a text editor such as notepad and look at the first JavaScript block near the top of the file.  It defines several variables that you can change to suit your needs, just don't delete any of them.

Happy RSS surfing!

Tags: ,

newsgroups | Web | WWW

November 23, 2012

Visual Studio Settings Switcher

I just published the first release of my Settings Switcher extension for Visual Studio 2012 on CodePlex.

Yesterday, I was looking for a quick solution to switch between Visual Studio settings files (.vssettings) without using the Tools > Import and Export Settings… dialog.  The dialog’s great, but it’s a bit slow to use a wizard every time I want to switch settings.  Sometimes I like to jump back and forth between different solutions quickly during the day, like when I’m working on various open source projects with different code formatting requirements.  I wanted a quicker way to load settings after opening a solution.

I found this blog post, which had a link to a tip on Sara Ford’s blog showing how easy it is to create a command button in the IDE to switch between settings by writing a simple macro.

Unfortunately, Visual Studio 2012 doesn’t support macros.

So I decided to create a Visual Studio extension.  The original premise was simply to show a drop-down list containing all of the .vssettings files and when one is selected those settings are applied.  That turned out to be a useful feature of the extension; however, while I was developing it I realized that I could add a few more useful features.  For example, I added a button to format every code file in the solution according to the currently selected settings.

But the primary feature, IMO, is that when you close a solution or exit Visual Studio, the extension saves a reference to the current settings file in the solution’s user options file (.suo).  The next time that you open that solution, the associated settings are automatically applied by the extension.  “Automate everything” is my motto :)

This extension should be useful to open source developers.  Typically we work on projects across multiple teams, with different code formatting requirements.  The next time you fork a project you can simply open the solution, edit your settings in the normal Tools > Options… dialog to meet the requirements of the team, then export the settings to a new file using the Export Current Settings button on the Settings Switcher toolbar provided by the extension.  And that’s it!  Every time you open that particular solution, its associated settings are applied automatically.

Project coordinators should also consider checking in a .vssettings file to their repositories.  This will make it easier for interested developers to set up their environment for working on your project.  A developer would simply need to copy the .vssettings file that you provided to the directory that Visual Studio uses to export settings files on their system (Tools > Options > Environment > Import and Export Settings).  Then follow the instructions above to associate the settings with the solution.

I’ve got some additional features that I plan on adding in the future, time permitting.  Some of them are very community-oriented.  We’ll see how that goes.

Feedback is appreciated.  Feel free to request a feature, report a bug or start a discussion.

 

November 07, 2012

Open Reactive Extensions (Rx)

It’s time to start thinking reactively!  What have you been waiting for?

Maybe you were waiting for this:
Yesterday, Microsoft announced that Reactive Extensions (Rx) is now an open source project.  The complete source code for Rx 2.0 is being hosted at http://rx.codeplex.com under the Apache License 2.0.  After an extensive prerelease period and two “officially” released versions, Rx is finally being opened up to the community for a look inside the monadic machinery that helps developers to compose asynchronous queries with ease.  Not only that, but now you can submit bug fixes and implement features yourself.*

Maybe you were waiting for this:
Interactive Extensions (Ix) for .NET and JavaScript are open as well.  They are included under the same source control root as Rx.  I’ve decided to form a habit:  Whenever I create a new project, before I even write a single line of code, I’ll add references to the Rx Package and Ix Package from NuGet.  (Note:  I just started a discussion about the future of distribution, so those links may change.)

Maybe you were waiting for this:
The Rx Team has included brand new libraries to support C/C++.  They are aptly named Rx++ and Ix++.

Or maybe this:
The Rx Team has included new “Rx binding” examples, which are basically just specialized examples illustrating how to apply Rx and LINQ to specific domains.  They’ve included Tx, which is a set of code samples showing how to use LINQ to Events; e.g., real-time queries from trace logs for ETW, Windows Events and SQL Server Extended Events.  Another Rx binding example they’ve included is LINQ2Charts, which “allows developers to use LINQ to create/change/update charts in an easy way and avoid having to deal with XML or other underneath data structures”.

They’re also looking for new examples of Rx bindings, so let them know if you’ve got any.

Or this?
Rx already has an open source community building libraries and tools around it.  And now that Rx itself is open source, that should help to strengthen the existing open source Rx community by exposing it to more developers.  So even if you can’t/won’t contribute code directly to Rx, either due to complexity or legal constraints or whatever, then at least consider contributing code and/or feedback to one of the existing open source projects that enhance Rx.

Seriously, are you still considering how to develop asynchronous applications without Rx?  Are you also considering how to simplify data extraction from a SQL Server table without using SQL?  Or concurrently poke your eyes out with the pointy ends of a Metro-style button?

My (Additional) Impressions

A primary key to success for any open source library is to be extremely useful.  Another key to success is to be unique.  Rx is both, so I have no doubt that it will flourish in the open source world.

Rx goes a long way to make asynchronous programming easy.  It’s a solid library built around core principles that hides much of the complexity of controlling and coordinating asynchrony within any kind of application.  Opening it will help to lower the learning curve and increase the adoption rate of this amazing library, enabling developers to create complex asynchronous queries with relative ease and without any spaghetti code left over.  If asynchronous spaghetti code were a disease, Rx is the cure.

The more functionality a library offers, the more it becomes imperative to be able to peruse the source code.  Understanding user error and library bugs can be very difficult without source code, especially given the inherent complexity of asynchronous programming and the many factors of Rx’s public contracts, which must be considered while developing and debugging.  Capturing every last detail about Rx operators in documentation may be overly cumbersome or even impossible.  Opening Rx can help to make building and debugging complex asynchronous queries much easier.

Opening Rx allows the community to be less entitled and more empowered.  Developers using Rx already identify bugs and request features by posting them in the Rx forum, so it makes perfect sense to allow them to fix bugs and implement features themselves in a timely manner and share their work directly with the community.

Opening Rx to the community will help to tighten their relationship.  It will help to focus the flow of feedback to Microsoft, enabling more informed decisions about the shape of features and the future of Rx.

I’ve noticed the trend at Microsoft to move very useful products into open source projects on CodePlex.  I’ve been wondering whether Rx would take this route as well.  There are many great uses for Rx and its reach will certainly grow even more as an open source project.  The communities around Microsoft’s many open source projects are continuously growing and Rx certainly deserves to benefit from community involvement as much as any other product.

*After jumping through a legal hoop or two.  Read the Rx project home page for details.

Tags: ,

Open Source | Rx

May 05, 2012

TCP Qbservable Provider Security

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

This is the second post in a series of posts on my TCP Qbservable Provider library, which is a proof-of-concept library that enables easy hosting of queryable observable services over TCP.  In this post, we dive into security.

There are two kinds of security that I believe must be fully supported by any IQbservable Provider to be a viable option for hosting public web services that can accept queries from anonymous clients:

  1. Semantic security
  2. Resource security

I’ve recently checked into source control new security features that address these issues.  More work needs to be done before the next release, but it’s a good start.  I’ve also included new examples in the example applications: SandboxedService, LimitedService and MaliciousClient.  These new examples are explained throughout this post.

Note that the new restricted security model is enabled by default for all hosted queries.  You must opt-out of the default security model by setting specific properties and by using a specific API to host your service.  The details are explained within this post.

Semantic Security

Semantic security is about limiting the kinds of queries that a service will execute to a minimal set that makes sense for the kind of service being offered.  It can be broken down further into two subcategories:

  1. Operators and Methods
  2. Types of Expressions

Operators and Methods

Semantic security limits the kinds of operators and methods that the service permits.  Clients can technically send any kind of operators that they want – you can’t prevent anonymous clients from sending strange (or even malicious) queries; however, the service must reject queries that don’t make sense based on the operators used within the queries to prevent the server from executing code that is unrelated to or just plain wrong for the service being offered.

For example, imagine that you’ve written a service that pushes news to clients: IQbservable<Article>.  Without semantic security, the service will happily accept any kind of query from clients.  Thus clients may execute operations such as windowing, buffering, aggregation, creating timers, creating ranges, zipping, joining, filtering, projecting, among others.  That’s a lot a functionality to offer for a news service, though perhaps it makes sense for your particular implementation; e.g., it might be nice to allow a client to query for articles of two different authors joined by coincidence over a particular duration.  However, does it make sense for a client to submit a query that uses the Range operator to generate a sequence of values from 1 to 10, for each article, and then project each value without the article?  Probably not.  A client could just as easily query for all articles unconditionally and then generate a range from 1 to 10 locally, without having to burden the server with such a trivial request.  Perhaps it would be better for this service to disallow use of the Range operator.

In fact, it may be better to limit queries to just simple filtering and projection, at first.  Clients could filter news articles and select specific properties in which they’re interested.  This may provide enough functionality to support common queries and yet still provides some useful optimizations.  It also keeps your service fair and responsive by doing less work per query, assuming that clients don’t generally need any other functionality.  After going live with your service you could accept feature requests for other kinds of query operators based on user feedback, consider whether the additional semantics make sense for your particular service, and then enable those operators.  You should also consider whether your server can handle the additional load of more complex queries – the next section, Resource Security, addresses that topic.

The first release of TCP Qbservable Provider (1.0 Alpha) already supports semantic security by allowing you to host queries from your own IQbservableProvider by calling the ServeTcp extension method; however, to limit query operators you must manually create a custom ExpressionVisitor, which can be somewhat difficult to do.  So to make things easier, a new feature (recently checked into source control) of the TCP Qbservable Provider library enables easy whitelisting of operators so that you don’t have to create your own IQbservableProvider or ExpressionVistor.

To use this feature, simply create an empty instance of ServiceEvaluationContext and call the AddKnownOperators method to whitelist operators that your service must support.  Then assign this context to a QbservableServiceOptions object that is passed to the ServeQbservableTcp extension method, which is responsible for hosting your observable over TCP.

var context = new ServiceEvaluationContext(
    Enumerable.Empty<MethodInfo>(),
    includeSafeOperators: false,
    includeConcurrencyOperators: false);

context.AddKnownOperators("Where");
context.AddKnownOperators("Select");

var service = source.ServeQbservableTcp(
    endPoint,
    new QbservableServiceOptions()
    {
        EvaluationContext = context
    });

With the above configuration, client queries using any methods other than the Where and Select operators are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and LimitedService examples checked into source control.

Types of Expressions

The ExpressionOptions property of the QbservableServiceOptions object provides lower-level control over the types of expressions that are permitted.  By default, only basic expressions are permitted.  Expressions such as assignments, blocks, loops, constructor calls and others are rejected by the service.  The default option is recommended for public services, though you can assign this property to any combination of flags to allow additional expression types when needed.

Opting Out of Semantic Security

To disable the semantic security model entirely, set the AllowExpressionsUnrestricted property of the QbservableServiceOptions object to true.  By doing so, the provider will ignore the ServiceEvaluationContext object and the ExpressionOptions property and simply allow any expression and operator to be used within a query.  Of course, this is only recommended if you fully trust all clients, which is not the case in public scenarios.

Resource Security

Resource security is about preventing unauthorized code from accessing system resources, such as memory, CPU, the file system and networking components.  It can be broken down further into two subcategories:

  1. API Security
  2. Algorithmic Security

API Security

The FCL offers many APIs that malicious clients could abuse, such as System.AppDomain, System.Environment, System.IO.*, System.Reflection.*, System.Security.*, System.Windows.*, etc.  In order to safely host a service that can execute arbitrary code from clients, the service would have to prevent unauthorized access to certain APIs.  The whitelisting approach used by the semantic security model described previously is one way to solve this problem, though the mechanism that is generally used to prevent unauthorized access to APIs in the .NET Framework is Code Access Security (CAS).  CAS offers a way to restrict APIs through permissions.

Whitelisting has an advantage over CAS.  CAS allows certain APIs to execute that don’t make sense in a server environment, simply because they don’t demand any permissions; e.g., System.Console.WriteLine.  In order to entirely prevent use of System.Console within queries, whitelisting must be used.

However, there are advantages to using CAS over whitelisting.  For one thing, the whitelisting approach I’ve implemented only checks methods and operators, while the CAS approach applies to any operator, method, constructor, event or property.  Furthermore, the CAS approach applies generally to the entire .NET Framework and third-party assemblies, whereas the process of manually whitelisting APIs may inadvertently expose clients to a security vulnerability hidden deep within some seemingly benign API.  In other words, CAS frees you from the burden of having to ensure that your chosen whitelisted operators and methods don’t inadvertently permit clients to format your C: drive.

Therefore, using sandboxing together with whitelisting provides the strongest security model for hosting services publicly by limiting the APIs that services will permit within anonymous client queries to the set of APIs that are resource-safe and semantically acceptable for a given service.

To incorporate CAS security into the TCP Qbservable Provider library, I’ve added new overloads for the QbservableTcpServer.CreateService method.  You may already be familiar with the CreateService method, since it’s also the API that enables you to host a service accepting an argument, as described in my first blog post in this series.  The new overloads begin with an AppDomainSetup parameter.  They host services within a sandboxed AppDomain that applies minimal CAS permissions by default, though it’s easily configurable by passing in a PermissionSet object to one of the overloads.  By default, sandboxed services will only permit queries to execute code that runs within a transparent security context, denying access to any API that demands additional permissions.  In other words, basic code can be executed within queries, but any APIs that demand additional permissions are rejected by throwing a SecurityException.

To host your service in a sandbox with minimal permissions, call the QbservableTcpServer.CreateService method and pass in an AppDomainSetup object as the first argument.  You must assign its ApplicationBase property to the base path of your sandboxed AppDomain, from which dependent assemblies will be loaded.  The .NET Framework guidelines recommend specifying a different path than the application’s actual base path, for additional security.  In the example below, I’ve assigned the sandbox’s base path to the application’s sandbox_bin subfolder, which would contain only the assemblies that the sandbox requires to execute the host service.

Unfortunately, that’s not all you must do.  The service factory argument cannot be a lambda because it’s invoked within the AppDomain and, therefore, must be serializable, though the compiler does not ensure that lambdas’ closures are serializable.  You may be able to workaround this if a closure isn’t created by referencing a method on a MarshalByRefObject, but that may actually defeat the purpose of the sandbox because ultimately client queries would be executed outside of the sandbox.  Instead, you should use a static factory method to create your service, as shown in the example below.

At the moment, due to type-inference issues, you must explicitly wrap the call to the factory method within an explicit delegate, though I plan on addressing that problem in the future, if possible.

var appBase = Path.GetDirectoryName(new Uri(Assembly.GetEntryAssembly().CodeBase).LocalPath);

var newAppBase = Path.Combine(appBase, "sandbox_bin");

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    endPoint,
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));
...
public static IObservable<int> CreateServiceObservable(IObservable<object> request)
{
    return Observable.Range(1, 3);
}

With the above configuration, client queries using APIs that require any permissions other than security-transparent execution are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and SandboxedService examples checked into source control.

Keep in mind that your service callbacks, including the CreateServiceObservable method shown above, are executed with the reduced permission set of the sandboxed AppDomain; however, you can assert additional permissions from within your callbacks if needed.  Additional permissions may be needed when creating the service observable or executing observation side-effects, as shown in the following example.

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    endPoint,
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));

service.Subscribe(terminatedClient =>
    {
        new PermissionSet(PermissionState.Unrestricted).Assert();

        try
        {
            Log.Trace("Client shutdown: " + terminatedClient.Reason);
        }
        finally
        {
            PermissionSet.RevertAssert();
        }
    });

In the example above, the hypothetical Log.Trace method demands full trust to execute.  Since the callback is executing under the permission set granted to the sandboxed service, the demand will fail unless the required permissions are asserted, even across an AppDomain boundary.

The reason why full trust can be asserted here but not by clients’ queries is because callbacks are executed by one of your own assemblies while queries are executed within security transparent dynamic assemblies, which cannot assert permissions beyond the set granted to the sandbox.

The entry point assembly is automatically granted full trust permissions by the CreateService method and, presumably in the above example, the assembly containing the callback code is the application assembly, so it’s able to assert full trust.  Though if it’s not the application assembly, then you can specify additional full-trust assemblies by calling a specific overload of the CreateService method that accepts an array of Assembly objects.

Note that to assert full trust, assemblies must be strong-named.

Algorithmic Security

Algorithmic security is about restricting the types of expressions that the service permits based on their effect on system resources.

For example, returning to our IQbservable<Article> news service, let’s assume that we’re executing our service in a sandbox and we’ve already applied limited semantic security to prevent most operators from being executed, with the exception of a few that we think will be useful to clients for querying news articles:  Where, Select, Aggregate, Scan, GroupBy, Join, Timer, Interval and Sample.  We’re also allowing primitive types, List<T> and arrays to be created within clients’ queries.

Is our news service safe?

No, it’s not safe.  We wouldn’t want the service executing a client’s query if it contained expressions that performed any of the following actions, listed in an approximately-progressively-worsening order:

  • Performs 100 operations in a single query.
  • Creates 500 joins.
  • Allocates a 10MB array for every article containing the individual words within that article.
  • Aggregates lists of 10K articles containing the entire contents of each article.
  • Samples at a very short duration causing frequent output.
  • Starts a high-rate timer causing frequent output.
  • Starts 100K low-rate timers.
  • Allocates a single 2GB array.
  • Creates a single 10GB string.
  • Uses the GroupBy operator in such a way that it never frees memory.

These kinds of queries negatively impact CPU and memory, so their use must be restricted somehow.  To do that, we must first identify how system resources can be effected by queries, then determine how each particular API can be used within queries to effect system resources, and finally design a model for constraining each particular API, or otherwise prevent them entirely.

Generally, the issues with the example queries above are as follows:

  1. Unrestricted quantities of permitted operators.
  2. Unrestricted combinatorial use of operators.
  3. Unrestricted operator arguments.
  4. Unrestricted allocation of objects, including primitive types, collections and arrays.
    1. Unrestricted magnitude.
    2. Unrestricted quantity.
  5. Unthrottled output.
  6. Unthrottled input.  (Applies to duplex queries only; examples are not provided above.)

It seems that some of these issues are much easier to detect than others, though all of them need to be addressed to have a truly secure service model for publicly hosting queryable observables.

In general, when any potentially unsafe expressions are found within a client’s query, they must be evaluated to determine whether their particular usage is permitted and under what constraints.  You may notice some overlap with the concept of semantic security described previously.  The difference is that semantic security is only about semantics; i.e., whether the intent of a query through the expressions it uses is compatible with the intent of the service, whereas algorithmic security is about constraining the effects particular expressions have on system resources.

In order to constrain algorithmically-unsafe expressions, we must go beyond simple whitelisting and provide additional options for configuring the limits of particular expressions on a per-operation and per-type basis.

Currently, the TCP Qbservable Provider library does not offer any solutions for algorithmic security, though it’s something that I’m considering for a future release.  I plan to add configurable options such as the following:

  1. Maximum quantities for individually whitelisted operators and methods, with reasonable defaults.
  2. Maximum quantities for categorized operators and methods, with reasonable defaults; e.g.,
    1. Permit only 1 usage of concurrency-introducing operators.
    2. Permit only 1 usage of timer operators.
    3. Permit only 1 usage of buffering operators.
  3. Constraints on particular operator arguments, with reasonable defaults; e.g.,
    1. Interval:  30 seconds <= period <= 12 hours
    2. Range:  0 <= count <= 10
    3. Buffer:  2 <= length <= 100
  4. Maximum quantity and capacity of new collections and arrays, with reasonable defaults; e.g., new object[N], where N <= 10
  5. Maximum quantity and size of strings.
  6. Maximum size of incoming serialized expression trees, in bytes.
  7. Maximum size of serialized query payloads, in bytes, with separate settings for output notifications and duplex input notifications.
  8. Input/output throttling, with a default duration of 30 seconds.

In the meantime, I’ve configured the default semantic security whitelist to exclude all potentially unsafe operators, to disallow array allocations and to disallow explicit calls to constructors.  Note that your service is still capable of creating unbounded objects, including arrays and collections, though queries cannot create non-primitive objects themselves.  Assignments and void-returning methods are also disallowed by default, so all objects and collections are immutable.

However, the current security model isn’t an exhaustive solution.  It doesn’t prevent clients from using clever algorithms to cause stack overflows and out of memory exceptions.  It doesn’t even, for example, prevent clients from easily creating a 2GB string.

Conclusion

There’s more work to be done on the security model to safely host queryable observable services publicly.  Even with whitelisting and sandboxing, the TCP Qbservable Provider is easily vulnerable to simple attacks that could bring down an entire process.

The current security model should, however, prevent clients from directly damaging your system.  Direct damage is prevented by using the sandboxing APIs to create your service, which rejects queries that demand unsafe API permissions.  You should also use semantically-restricted expressions, which are enabled by default, to reject queries attempting to use operators and methods that aren’t permitted by your service’s whitelist.

Tags: ,

IQbservable | Rx | Rxx

April 25, 2012

LINQ to Cloud - IQbservable Over the Wire

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

I’ve recently released a TCP Qbservable Provider library within the Rxx project on CodePlex.  It targets the .NET 4.5 Framework and is written entirely in C# 5 and VS 11 Beta.  If stabilized in the future, it will probably be merged into the Rxx library.  It’s currently pre-Alpha, so beware of bugs and don’t expect much consideration for security.  Feedback is welcomed.

In this blog post I’m going to introduce you to the TCP Qbservable Provider library and discuss its key features in some depth.  I'll also include a few working examples at the end.  If you want to try programming the examples yourself, then start by downloading the library and unzip it.  You’ll need to add references to all of the assemblies in the bin folder, though you can download the Rx 2.0 Beta assemblies from NuGet, if you’d prefer.

The applications in the zip file provide more detailed examples, but I won’t be discussing them in this blog post.  I encourage you to try them on your own.  You can download the complete source code, which contains the code for the example applications, the provider library and the Rxx 2.0 Beta library.  (Note that the Rxx 2.0 Beta library has not been officially released yet and is likely to change.)

What does this thing do?

In a nutshell

TCP Qbservable Provider enables you to easily create a TCP web service that offers an IQbservable<T> for clients to query using Rx and LINQ.  You simply choose an end point consisting of an IP address and a port number, then your observable becomes available to clients over TCP.

Clients can query your observable using Rx operators.  Queries can also be written in LINQ’s query comprehension syntax.  When a client subscribes to its query, a connection is made to the service and then the query is serialized and sent to the server, where it is executed.

In case you’re wondering, IQbservable<T> is the model that Rx defines to represent queryable observables.  It’s the dual to queryable enumerables, which is represented by IQueryable<T>.  IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>.  For a deeper explanation, see this video on Channel 9 by Bart De Smet.

The following diagram illustrates the basic communication process:

image

The basic process:

  1. The server begins hosting a TCP service at a specific end point, which includes an IP address and a port.
    1. The ServeQbservableTcp extension method is used to serve an IObservable<T> as an IQbservable<T>.
    2. The ServeTcp extension method is used to serve an IQbservable<T> directly.
    3. The factory methods on the QbservableTcpServer class are used to create queries that accept subscription arguments.
  2. The client creates an instance of QbservableTcpClient<T> to represent the service locally.  No call is made to the server yet.
    1. The client must specify a type for T that is compatible with the type of the data in the IQbservable<T> service.
    2. The client must supply the end point, which includes the IP address and port of the service.
    3. The client can optionally configure the service proxy; e.g., to enable duplex communication and specify known types.
  3. The client creates a LINQ query by applying Rx operators to the result of the Query method, either by using fluent method call syntax, query comprehension syntax or some mixture of both.
    1. All Qbservable Rx operators are supported.
    2. Arbitrary code can be written within the query to execute side-effects on the server; e.g., via the Do operator.  Clearly, this can be a security concern, but it can also be quite powerful if used properly.
    3. Anonymous types are supported.  (More on this below.)
    4. Full duplex communication is supported.  (More on this below.)
    5. Optionally, the server can require an argument that the client passes to the Query method, which must be a serializable type that the server expects.
  4. The client eventually calls Subscribe on the query, just like a normal IObservable<T>.  This causes a TCP socket to be opened with the server.
  5. The server receives the subscription request and performs a protocol negotiation with the client (currently the entire negotiation is mocked, but it may be developed in the future to support custom protocols).
  6. The client serializes the query and sends it to the server.
  7. The server deserializes the query and begins executing it.
  8. Observable notifications are pushed to the client asynchronously.

Note that all data is currently serialized using a binary formatter.  This includes expression trees and all notifications, in both directions.

Serialized Expression Trees and Anonymous Types

When you write a query against an IQbservable<T>, the compiler creates an Expression Tree to represent your query at runtime.  Often expression trees contain anonymous types that are generated by the compiler; e.g., to support let statements and closures.  The TCP Qbservable Provider must serialize and send an expression tree to the server to be executed.  Even though the Expression classes in the FCL and the types generated by compilers are not serializable, the TCP Qbservable Provider library enables serialization of Expression trees and anonymous types by automatically swapping them with internal, serializable representations.  All types of expressions are supported except for dynamic and debug info expressions, but I’ll consider supporting those in the future as well.

Expression trees are an internal implementation detail that you don’t need to be aware of to consume and host IQbservable<T> services.

Full Duplex Communication

Full duplex communication is when a client and server communicate in both directions simultaneously over a single connection; i.e., the client sends the server data while the server sends the client data, without connecting to another end point.  Full duplex communication occurs automatically for instances of IObservable<T> within a client’s query, though the server must opt-in to allow it.  Full duplex communication can also occur automatically for local members and IEnumerable<T> objects that are generated by iterator blocks, though a client must opt-in to allow it; otherwise, the default behavior is for local members and IEnumerable<T> iterators found within a query to be evaluated locally and then replaced with constants before the query is sent to the server.

While the server is executing the client’s query, if it comes across any local member, including any static or instance methods, properties or fields, and if full duplex communication has been enabled on both the client and the server, then the server will send an invoke message to the client and wait for the return value, synchronously.  The return value is not cached.  A new message is sent each time a local member must be executed within the query.

For full duplex IEnumerable<T> objects generated by iterator blocks, the server sends a synchronous message to the client to get an enumerator.  Subsequent calls to enumerate the enumerable from within the query (e.g., MoveNext, Reset and Dispose) are sent to the client as synchronous invoke messages, similar to the behavior previously described for local members.

While the server is executing the client’s query, if it comes across an instance of an IObservable<T> that was either a closed-over local variable or the return value of a full duplex local member, and if full duplex communication has been enabled on the server (it is automatic on the client for local instances of IObservable<T>), then the server sends a subscribe message to the client and waits for a subscription response, synchronously.  The client is then free to asynchronously push notifications to the server; e.g., OnNext, OnError and OnCompleted.

A large number of local members, IEnumerable<T> iterators and IObservable<T> objects are supported simultaneously for individual clients.  The maximum is currently bounded by the number of keys that fit into a generic Dictionary<TKey,TValue>, per client, though each category also has its own dictionary.  This means that a given client probably supports over 2 billion unique local members, 2 billion IEnumerable<T> iterator instances and 2 billion IObservable<T> instances.  It’s highly unlikely that local members will ever be a problem given the complexity required to generate a query containing over 2 billion expression objects; however, the latter two are dealing with instances of objects that may be generated by the query as it executes.  Though I doubt these capacity constraints will ever be a problem given the context of network communication, it’s something to consider if, for example, you’ll be writing large multi-layered full duplex SelectMany queries that generate many notifications very fast and must execute uninterrupted for a very long period of time.  It’s possible that you may eventually hit 2 billion observables, but it does sound a bit crazy.

Why should I create an IQbservable<T> service?

There are a few major advantages to using a queryable observable instead of a traditional web service:

It’s Asynchronous

Web services typically process requests synchronously, or are fairly complicated to make asynchronous, but a queryable observable service can easily be entirely asynchronous.  By processing requests asynchronously, the server is free to handle additional requests.  This makes for very scalable services.

It’s Reactive

Web services are not typically reactive, but a queryable observable service can easily be entirely reactive.  Clients don’t have to poll your service.  Instead, the server can determine when to send notifications to clients and clients can concentrate on doing other things.

Furthermore, the timing of notifications can be controlled by various internal factors, such as the semantics of the source or the current load on the server.  This is better than clients hitting the server as fast as possible trying to squeeze out more data that simply isn’t available yet, while the server still tries to process requests in a fair manner.  Though a truly reactive service does, however, require mostly persistent TCP connections to be entirely beneficial in this way, perhaps.

It’s Queryable

Reactive web services are typically blunt data hoses that simply receive a request and then begin pushing data to the client as fast as possible, but a queryable observable service receives the client’s query and processes it on the server, so it’s able to return only the data that the client has requested.  Not only is this better for bandwidth usage and client performance, but it’s also potentially better for server performance and overall throughput because the service can interpret the client’s query to make optimizations on-the-fly.

Imagine a social media service that exposes an IQbservable<PersonSaidSomething>.  A client could send a query to the service asking to receive notifications whenever a particular person says something, instead of receiving notifications whenever anyone says something and filtering them on the client.  Admittedly, this particular example isn’t so impressive considering that most social media services probably offer a simple parameterized option to filter notifications for individual people.  So what about a query that asks the service for notifications whenever any person says something about a particular character on a TV show during the date and time on which that show is airing?  Do any social media services offer parameters for that?  What about a query that asks the service for notifications whenever person A writes something longer than 50 characters, between 5 to 10 minutes after person B says something about a particular product?  You get the idea…

It’s Simple

Web services are quite simple to create nowadays, considering all that ASP.NET, WCF and IIS offer, but queryable observable services are simple too if you’re already familiar with Rx and LINQ.  Don’t be frightened by all of the strange terms and confusing explanations – they’re my fault.  If you’re still confused, then try experimenting with it yourself.  Begin with the examples later in this post.

Hosting an IQbservable<T> Service

A service returns an IQbservable<T> so that clients can write queries against it, but the source doesn’t have to be an IQbservable<T>.  The source can be another IQbservable<T> or just any plain old IObservable<T>.

If the service implementation is an IObservable<T>, then the TCP Qbservable Provider uses the local IQbservableProvider (Qbservable.Provider) on the server to execute queries that it receives from clients.  It happens automatically, so you don’t need to be concerned at all about how your observable is being translated into an IQbservable<T>.  You can thank the Rx team for their fantastic work on that (and of course, Rx in general).

If the service implementation is an IQbservable<T>, then think of the TCP Qbservable Provider as merely a communication/serialization proxy.  It takes your existing query and hosts it over TCP.  Furthermore, writing a custom IQbservableProvider allows you to constrain the Rx operators that clients can use, which is good from a security standpoint.

For example, if you want to expose LINQ to WQL (a.k.a., WMI Events) on a server, then simply call the ServeTcp extension method to host your query over TCP.  If you want to expose a typical IObservable<T> on a server, e.g., Observable.Interval, then simply call the ServeQbservableTcp extension method to host your observable over TCP as an IQbservable<T> service.

When a client queries your service, its query is serialized and sent to the server by the TCP Qbservable Provider running on the client. The TCP Qbservable Provider running on the server deserializes the client’s query and then glues it to your hosted query; e.g., Observable.Interval or LINQ to WQL, etc..  From the perspective of the source observable, it’s as if the client’s query was written within the same process.  When data is pushed from the hosted observable, the TCP Qbservable Provider serializes it to the client. From the perspective of the client, it’s as if the provider is running within the same process.

Consuming an IQbservable<T> Service

Clients consume web services by creating proxies that encapsulate all of the communication logic.  A queryable observable service is no different.

A client begins by creating an instance of QbservableTcpClient<T>, which represents a single queryable observable service end point.  This object exposes an IQbservable<T> for the service by calling its Query method.  At this point, the client is able to write a LINQ query and then call Subscribe to initiate communication.  All of this is covered earlier in this blog post, in the section about the basic communication process.

Alright, let’s do some coding…

Examples

Add the code from each of the following examples to the Main methods of new console application projects in VS 11 Beta, targeting the .NET 4.5 Framework.  I recommend creating separate console application projects for the server and client examples.

Add the following using directives to the top of each file.  You may need to include others as well, depending upon the example.

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using QbservableProvider;

As mentioned earlier, you’ll need to add references to all of the assemblies in the bin folder of the zip file you’ve downloaded, though the Rx 2.0 Beta assemblies can be downloaded from NuGet, if you’d prefer.

Example 1: Timer

Let’s start out easy and create a simple queryable observable timer service.  Here’s the spec:

  1. When a client subscribes, start a 2 second timer on the server.
  2. When the timer elapses on the server, send a notification to the client.
  3. The client’s connection persists until either the client or the server cancels the query, or the query completes.
IObservable<long> source = Observable.Timer(TimeSpan.FromSeconds(2));

var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, port: 49593));

using (service.SubscribeEither(
    client => Console.WriteLine("Timer service acknowledged client shutdown."),
    ex => Console.WriteLine("Timer service error: " + ex.Message),
    ex => Console.WriteLine("Timer service fatal error: " + ex.Message),
    () => Console.WriteLine("This will never be printed because a service host never completes.")))
{
    Console.ReadKey();
}

Now let’s create our first client.  Here’s the spec:

  1. Subscribe to the service that we’ve just created.
  2. Print the timer notification from the service.
var client = new QbservableTcpClient<long>(new IPEndPoint(IPAddress.Loopback, port: 49593));

IQbservable<long> query =
    from value in client.Query()
    select value;

using (query.Subscribe(
    value => Console.WriteLine("Timer client observed: " + value),
    ex => Console.WriteLine("Timer client error: " + ex.Message),
    () => Console.WriteLine("Timer client completed")))
{
    Console.ReadKey();
}

To test these examples, make sure that you run the server application first.  Then run the client application.

After 2 seconds the client application prints the following:

Timer client observed: 0
Timer client completed

And the server application prints:

Timer service acknowledged client shutdown.

That example was boring, I know.

Example 2: Confirm the location

Let’s add a side-effect to the query so that we can see where the observable is running.  Here’s the new client:

IQbservable<long> query =
    from value in client.Query().Do(_ => Console.WriteLine("Where am I?"))
    select value;

After 2 seconds the client’s output is the same as before, but the server application now prints the following:

Where am I?
Timer service acknowledged client shutdown.

So it’s clear that our query was running on the server.  But what’s really awesome about this query is that the Do operator executed its lambda expression on the server!  Essentially, we told the server to write something to its own console window from within the query sent by the client.

Example 3: A more interesting client

Now that we know we can have the server do anything that we want, let’s have it download a web page for us.  Here’s the spec:

  1. Subscribe to the service that we’ve just created using the following query:
    1. When the service generates a timer notification, download the web page from http://blogs.msdn.com/b/rxteam.
    2. When the web page is downloaded, send its length to the client.
  2. Print the value received from the service.

Our goal here is to execute the entire query on the server, including downloading the web page.  We know how to verify that the entire query is executing on the server by using the Do operator, so I’ll add that to the end of the query.

IQbservable<int> query =
    (from value in client.Query()
     from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that the type of the query has changed from IQbservable<long> to IQbservable<int>.  We’re using the select statement to project a different type of value from the service.

After 2 seconds the client application prints the following (of course, the value may be different in your tests):

Timer client observed: 110429
Timer client completed

And the server prints:

Where am I?  110429
Timer service acknowledged client shutdown.

The server has downloaded a web page and sent back the length of the page to the client.

Example 4: Service arguments

Let’s modify the service to accept the timer’s duration as an argument.  That way the client can control the delay before the web page is downloaded on the server.

To accept an argument, we’ve got to make a slight change to the way in which we’re creating the service.  Instead of simply using the ServeQbservableTcp extension method on our timer observable, we’ll need some way of receiving the argument before we can create the observable.   The QbservableTcpServer class provides static factory methods for creating services based on a function that accepts an IObservable<T>, where T is the type of the argument that the service expects, and returns an IObservable<U> that represents the service.  It’s quite natural to express the concept of clients subscribing to our service with arguments, sometime in the future, as an IObservable<T>.  So we’ll define our service beginning with an IObservable<T> that represents client subscriptions.  I’ll name the lambda parameter that represents this observable as request.

var service = QbservableTcpServer.CreateService<TimeSpan, long>(
    new IPEndPoint(IPAddress.Loopback, port: 49593),
    request =>
        from duration in request.Do(arg => Console.WriteLine("Client sent arg: " + arg))
        from value in Observable.Timer(duration)
        select value);

Notice that we must specify the input and output types of the query when we call the generic CreateService<TSource, TResult> method.  I’ve also added the Do operator to print out the argument that is received from the client.

Now let’s update our client to pass in a duration for the timer.

IQbservable<int> query =
    (from value in client.Query(TimeSpan.FromSeconds(5))
     from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that I’m passing in the TimeSpan argument to the Query method.  The rest of the query is unchanged.

The server immediately prints the following when the client application starts:

Client sent arg: 00:00:05

And then after 5 seconds the client and server print the same results as before.

For much more advanced examples, including local member evaluation, anonymous types, full duplex communication of observables and iterator blocks, see the applications that are included in the download.  Grab the complete source code for the applications and the TCP Qbservable Provider library from the Rxx project on CodePlex.

Conclusion

I’ve barely scratched the surface of possibilities offered by the TCP Qbservable Provider service model.  This blog post only provides a high level view with extremely primitive examples.  I hope it has whet your appetite and you’re interested in using it in your apps.

I should also mention that the Rx team has done an amazing job with the core Rx libraries, and the IQbservable<T> stuff is in another league of its own.  I’ve also seen Bart mention a couple of times in the Rx forum that his team is investing more in the IQbservable space regarding remote queries and serializable expression trees, so I’m really curious to see what they’ve done and when/if it will be released.

In the mean time, if you’re interested in developing applications using the TCP Qbservable Provider library, then please let me know.  I enjoyed working on it as a proof-of-concept and I plan to invest more time stabilizing it, unless the Rx team releases something similar.  It will help me to prioritize knowing that people are actually using it.

Your feedback can help guide the direction of this project, so please report bugs and ideas to the Rxx project on CodePlex.  Thanks!

Tags: ,

IQbservable | Rx | Rxx

March 01, 2012

Async Iterators

You’ve already heard about async in C# 5.  This new feature is going to improve UI responsiveness and developer productivity simultaneously, to say the least.  I’ll assume that you’re also familiar with Task<T>, the generic type in the .NET Framework that provides a common model for asynchronous functions, and one of the types on which the async feature depends.

Task<T> has a limitation: it only represents a scalar value T.  This is fine for async methods that only compute a single value, but what if we need to write an async method that lazily computes a sequence of T?  In other words, the entire operation is asynchronous just like Task<T>, but instead of computing a single value we want to potentially compute many values, sequentially, while also being able to await other asynchronous operations.

The best we can do with Task<T> is to return Task<IList<T>>, gathering each T into a list.  There’s no way to yield each individual T as they are computed when using Task<IList<T>>.

The following C# console application illustrates this behavior.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class TaskExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            var list = await AsyncScalar();

            Output.WriteLine("Async Completed");

            foreach (var value in list)
            {
                Output.WriteLine(value);
            }

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static async Task<IList<int>> AsyncScalar()
        {
            Output.WriteLine("AsyncScalar Called");

            var list = new List<int>();

            for (int i = 0; i < 3; i++)
            {
                Output.WriteLine("Computing " + i);

                await Task.Delay(TimeSpan.FromSeconds(1));

                list.Add(i);
            }

            return list;
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): AsyncScalar Called
(Thread 1): Computing 0
(Thread 4): Computing 1
(Thread 4): Computing 2
(Thread 4): Async Completed
(Thread 4): Value = 0
(Thread 4): Value = 1
(Thread 4): Value = 2
(Thread 4): Done

Note that the Output class is also used by subsequent code snippets in this post, but I won’t be repeating it again.  Here’s the definition:

using System;
using System.Threading;

namespace ConsoleApplication1
{
    static class Output
    {
        public static void WriteLine(string message)
        {
            Console.WriteLine("(Thread {0}): {1}", Thread.CurrentThread.ManagedThreadId, message);
        }

        public static void WriteLine(int data)
        {
            WriteLine("Value = " + data);
        }
    }
}

Notice that the AsyncScalar method in the previous example computes values asynchronously, but they’re gathered into a list that is eventually returned when the computation completes.  The caller awaiting the task gets back a pre-computed list of data (referred to as a hot sequence), which can only be iterated synchronously.

Ultimately, our goal is to somehow replace list.Add with something like yield.  We want to be able to intermix awaiting and yielding in an imperative-style method that supports control flow statements, such as for loops.  And the result must be an asynchronous sequence, which doesn’t require synchronous iteration.

async IEnumerable<T> = Error

C# doesn’t allow us to apply the async keyword to iterator blocks.  To use the async keyword on a method it must return void, Task or Task<T>.  To define an iterator block, we must return IEnumerable<T>.  Clearly these features aren’t compatible based on return type alone.

But why not support IEnumerable<T> as another possible return type for async methods?

The answer (if I may assume what the C# team was thinking) is that Task<T> represents an asynchronous function, which means that it provides the necessary model for awaiting a function’s return value.   IEnumerable<T> only represents an interactive sequence, which means that it doesn’t provide a model for awaiting an iterator block’s return value(s).  Instead, values must be pulled synchronously from the sequence.  As a result, async iterator blocks would always be forced to execute synchronously, defeating the purpose of async.

Ok, so how about Task<IEnumerable<T>>?

Nope.  It’s similar to my first code example above, which uses Task<IList<T>>.  It models an asynchronous function that eventually returns a sequence, but that sequence remains synchronous.  Recall that Task<T> represents a scalar-valued asynchronous function, which means that the C# compiler can’t do any tricks with an iterator block to convert Task<T> into something that represents an asynchronous sequence.  The best it can do is allow us to asynchronously invoke an iterator block, but the generated sequence must be enumerated synchronously, as shown in the previous example.  That’s not our goal at all.

Then how about IEnumerable<Task<T>>?

This might seem correct at first, but actually it doesn’t meet our requirements either.  It represents a synchronous sequence of asynchronous functions.  Interesting, but not what we want.  Let’s assume for a moment that the C# compiler transforms the code between yield return statements into Task<T> objects.  We could then write something like this:

// Doesn't compile
static async IEnumerable<Task<int>> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

But consider the behavior when consuming it.  We’d probably use a foreach loop, which calls GetEnumerator to get an IEnumerator<Task<int>>.  Then we’d call MoveNext, which would synchronously enter the for loop and hit the first await.  At this point, we’d expect MoveNext to return true and set Current to a Task<int>.  When the Task<int> completes it should execute the current continuation, which is the code following the await.  So then what happens if we ignore the Task<int> and call MoveNext again?  Perhaps it could block until the current Task<int> completes, but that behavior doesn’t meet our requirements.  We don’t want to block while iterating the sequence.  Furthermore, what happens if when Task<int> completes we don’t call MoveNext right away?  The iterator could begin computing the next Task<int> asynchronously, which means that it would have to buffer the sequence until our synchronous iteration catches up.  This is like the producer/consumer pattern with an implicit queue.  Again, it’s interesting but not what we want.  When a value is computed, we want our continuation to execute right away, without buffering.

So what would happen if we reversed it?  MoveNext returns Task<bool> and Current returns T.

Well that can’t be modeled with any combination of IEnumerable<T> and Task<T>, but if it existed it would allow us to await between yields, keeping the entire computation asynchronous and its iteration asynchronous as well.

IAsyncEnumerable<T>

In the Ix Experimental release (Ix_Experimental-Async on NuGet), there’s a type named IAsyncEnumerable<T>.  It has a single method named GetEnumerator that returns an IAsyncEnumerator<T>, which returns a Task<bool> from MoveNext and a T from Current.  Can we use it to write an asynchronous iterator block?  Let’s take a look at an example.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class AsyncEnumerableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            IAsyncEnumerable<int> sequence = DelayedSequence().ToAsyncEnumerable();

            Output.WriteLine("Awaiting");

            await sequence.ForEachAsync(Output.WriteLine);

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static IEnumerable<int> DelayedSequence()
        {
            Output.WriteLine("DelayedSequence Called");

            for (int i = 0; i < 3; i++)
            {
                Task.Delay(TimeSpan.FromSeconds(1)).Wait();

                Output.WriteLine("Yielding " + i);

                yield return i;
            }
        }
    }
}

 

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Awaiting
(Thread 4): DelayedSequence Called
(Thread 4): Yielding 0
(Thread 4): Value = 0
(Thread 4): Yielding 1
(Thread 4): Value = 1
(Thread 4): Yielding 2
(Thread 4): Value = 2
(Thread 4): Done

At first glance it looks like we’re closer to our goal.  At least we can iterate the sequence asynchronously and values are computed asynchronously.  The ForEachAsync extension is provided as a convenience because the foreach statement can’t handle a MoveNext method that returns Task<bool>, but you could instead call GetEnumerator and loop over a call to await MoveNext manually, if you prefer.

So we’re fine on the consumer side, but what about the producer?  Unfortunately, in order to introduce a delay we must block the current thread (note the call to Wait).  That’s because the iterator block returns IEnumerable<T>, which as we already know doesn’t model an asynchronous sequence.  It seems that IAsyncEnumerable<T> creates a Task<T> for each call to MoveNext to make our sequence consumable in an asynchronous fashion.  This means that our iterator block can’t control its own asynchrony.  Concurrency is injected for us automatically when the consumer decides that it’s ready to await the next value.  So essentially it’s still a pull model even though it’s asynchronous.

And of course we can’t simply add the async keyword to our iterator block, as discussed previously.  We’re now back where we started.

We need a better model for our asynchronous sequence.  A type that represents an asynchronous function like Task<T> as well as a sequence of values like IEnumerable<T>, but in a push-based fashion.  Then we can define out iterator block in terms of this new type by yielding values as they are computed asynchronously.

IObservable<T>

IObservable<T> and IEnumerable<T> overlap in that they both provide a lazily-computed sequence of values; however, they differ in that IObservable<T> represents an asynchronous (push) sequence while IEnumerable<T> represents a synchronous (pull) sequence.  A sequence that pushes values enables reactive computations, while a sequence from which values are pulled enables interactive computations.  LINQ provides a declarative abstraction over any computation.  Reactive Extensions for .NET (Rx) provides LINQ operators for IObservable<T> similar to the LINQ to Objects operators that require IEnumerable<T>.  If you’re not familiar with Rx yet, then you may want to get started with this blog post.

Now back to our problem at hand.  How can we write an iterator block with IObservable<T>?  Does the following work?

// Doesn't compile
static async IObservable<int> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

Unfortunately, no.  But wouldn’t it be nice?  Conceptually, it meets our requirements.  We can subscribe to the observable to receive values asynchronously.  The iterator block can define the observable by awaiting other asynchronous operations and yielding values as they are computed, sequentially yet asynchronously.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described in the remainder of this blog post.  It enables you to write code that looks very similar to the previous hypothetical example.  For more information, read this forum thread.]

Well the Rx team has already thought about this limitation in C# and provided a solution.  The Rx Experimental release (Rx_Experimental-Main on NuGet) provides several overloads of the Observable.Create method, one of which has the following signature:

public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod);

This method allows us to generate an observable using a normal iterator block.  But since we can’t mark the iterator block as async, we can’t await other asynchronous operations.  Instead, we’ll yield observables to await them.  But if we’re yielding observables, then how do we also yield computed values?  That’s where the IObserver<T> object comes in handy.  Here’s an example:

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class ObservableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start();

            Console.ReadKey();
        }

        static void Start()
        {
            Output.WriteLine("Starting");

            var xs = Observable.Create<int>(observer => AsyncSequence(observer));

            Output.WriteLine("Subscribing");

            xs.Subscribe(Output.WriteLine, () => Output.WriteLine("Done"));
        }

        static IEnumerable<IObservable<object>> AsyncSequence(IObserver<int> observer)
        {
            Output.WriteLine("AsyncSequence Called");

            for (int i = 0; i < 3; i++)
            {
                yield return Task.Delay(TimeSpan.FromSeconds(1)).ToObservable().UpCast();

                Output.WriteLine("Pushing " + i);

                observer.OnNext(i);
            }
        }
    }

    static class ObservableExtensions
    {
        public static IObservable<object> UpCast<T>(this IObservable<T> source)
        {
            return source.Select(value => (object) value);
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Subscribing
(Thread 1): AsyncSequence Called
(Thread 5): Pushing 0
(Thread 5): Value = 0
(Thread 4): Pushing 1
(Thread 4): Value = 1
(Thread 6): Pushing 2
(Thread 6): Value = 2
(Thread 6): Done

This behavior is exactly what we wanted: asynchronous subscribe, observations of values as they are generated, the ability to await other asynchronous operations and the use of normal control flow statements, such as for.

However, the implementation is a bit strange.  We’ve replaced list.Add from the first example with observer.OnNext instead of yield, which isn’t so bad, but then we also had to replace await from the first example with yield.  Not very intuitive, perhaps.

Maybe in the future we’ll get first-class compiler support for writing observable iterators.  Until then, this should do fine.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described above.  It enables you to write code that looks very similar to what first-class compiler support might look like, although it requires the use of a callback.  For more information, read this forum thread.]

Side note:

C# doesn’t support anonymous iterator blocks.  Here’s something else interesting: VB 5 does!

Do you agree that anonymous observable iterators is an apt use case for such a feature in C#?

Tags:

Async | C# 5 | Rx

January 12, 2012

Code Contracts for Visual Studio Pro

If you decided long ago that you weren’t going to bother trying Code Contracts because you didn’t have Visual Studio Premium edition or higher and thus couldn’t take advantage of static checking, then your wait is finally over.

Code Contracts 1.4.41228.0 was recently released, along with some great news: The Code Contracts static checker is now available for all editions of Visual Studio (excluding Express).

On a sad note, it seems as though Code Contracts will remain in DevLabs through the next Visual Studio cycle.

But to end on a brighter note, apparently the team is still trying to make it an official product.

From the forum announcements section:

We are trying to make the case for Code Contracts to be included in Visual Studio as an official product. Please send us your feedback as soon as possible, especially if you have any details about how exactly it has helped your development process.

So start using it and send them all of your success stories!

Here are some links to get you started:

Editor Extensions (Optional)
http://visualstudiogallery.msdn.microsoft.com/85f0aa38-a8a8-4811-8b86-e7f0b8d8c71b/

Portal
http://msdn.microsoft.com/en-us/devlabs/dd491992.aspx

Research Site
http://research.microsoft.com/en-us/projects/contracts/

Release Notes
http://research.microsoft.com/en-us/projects/contracts/releasenotes.aspx

Forum
http://social.msdn.microsoft.com/Forums/en-US/codecontracts/threads

November 12, 2011

Resources for Learning Rx

(Edit: Rx is now open source!)

Start by bookmarking the Rx hub on MSDN:
 
http://msdn.microsoft.com/en-us/data/gg577609

Especially the beginners guide and the .NET Resources and Community pages, which together provide several links to official videos, blog posts and articles, all of which are worth reviewing.
 
http://msdn.microsoft.com/en-us/data/gg577611
http://msdn.microsoft.com/en-us/data/gg577612

Next, bookmark the Rx forum.  This is the official place to search for answers and to ask new questions:

(Edit: 10/3/2014 - There's still a lot of good stuff on the Rx Forum, though Stack Overflow appears to be more active now.)

Rx Forum
http://social.msdn.microsoft.com/Forums/en-US/rx/threads

Depending upon your current level of knowledge with Rx, you may want to continue by going through the official hands-on labs.  They are a bit out-dated although differences between the labs and the latest Rx class library should be minimal.  When you do come across differences, a quick search on the Rx forum will show you what changes need to be made.

Hands-On Labs
http://go.microsoft.com/fwlink/?LinkId=208528
 
For further reading, I recommend starting off with the official conceptual documentation:
 
http://msdn.microsoft.com/en-us/library/hh242985(v=VS.103).aspx     
 
followed by the recommended design guidelines.  This document is a bit outdated, although where there may be differences between the examples and the latest Rx class library, the purpose and reasons for the guidelines still apply.

Recommended Design Guidelines
http://go.microsoft.com/fwlink/?LinkID=205219

After that there's plenty of community resources to choose from.  A new book has been written about Rx recently:
 
http://social.msdn.microsoft.com/Forums/en-US/rx/thread/813e56db-e410-4f35-8f72-1be34b08ce8a

[Edit (8/14/2012)]

Here's another new book.  This one's written by Lee Campbell and is available online for free:

http://www.introtorx.com

[/Edit]

And here's a list of blogs and open source projects that may interest you:
 
http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2cbd3b1c-d535-46ba-a9cf-3cd576a8e7c2

As for open source projects, I’m particularly fond of Rxx, and not just because I’m the co-founder and author along with James Miles.  ;)

Tags:

.NET | Rx | Rxx

November 09, 2011

Rxx 1.2 Released

If you’re not familiar with the Rxx project yet, it's something I’ve been working on for several months now along with James Miles.

Rxx is a library of unofficial reactive LINQ extensions supplementary to Microsoft's Reactive Extensions for .NET (Rx).  Rxx is developed entirely in C# and targets the .NET Framework 4.0, Silverlight 4.0 and Windows Phone 7 (WP7).

Why should you use Rx and Rxx?

Well, if you’re doing any kind of programming that involves asynchrony or concurrency, such as what is commonly found in UI layers, business layers, data access layers, middle tier services – basically everywhere, then you’ll definitely find Rx to be very useful.  Rxx adds a whole bunch of useful features on top of Rx and the .NET Framework Class Library (FCL), making it quite easy to introduce common patterns of asynchrony into any .NET program through the use of IObservable<T> and LINQ.

Rxx provides the following features.  See the Documentation for details.

  1. Many IObservable<T> extension methods and IEnumerable<T> extension methods.
  2. Many useful types such as CommandSubject, ListSubject, DictionarySubject, ViewModel, ObservableDynamicObject, Either<TLeft, TRight>, Maybe<T> and others.
  3. Various interactive labs that illustrate the runtime behavior of the extensions in Rxx.  Individual labs' source code included.

The latest release of Rxx is now available:

New features for Rxx 1.2 include:

  1. Compatible with Microsoft's Ix Experimental library.
  2. UI extensions for WPF and Silverlight, including AnonymousCommand, CommandSubject, a Subscription XAML markup extension for binding UI elements to observables, EventSubscription trigger and a reactive view model infrastructure.  (Download the labs application for examples.)
  3. N-ary Zip and CombineLatest combinators.
  4. Several parser updates, including new operators, non-greedy (lazy) quantifiers and major performance and memory improvements, such as avoiding stack overflows due to recursion in quantifiers.
  5. Cursor types and extensions (Rx and Ix).
  6. ListSubject and DictionarySubject.
  7. Consume extensions that generalize the producer/consumer pattern over observables.
  8. ApplicationSettingsBase extensions.
  9. ObservableSyndication for RSS 2.0 and Atom 1.0.
  10. ObservableFile and additional ObservableDirectory extensions.
  11. Stream, FileStream and TextReader extensions.

More details can be found in the latest release notes: http://rxx.codeplex.com/wikipage?title=Release%20Notes

We’d really appreciate your feedback.  Please let us know about your experiences with Rxx by starting a new discussion or submitting an issue.  Thanks!

Tags: ,

.NET | CodePlex | Open Source | Rx | Rxx