March 01, 2012

Async Iterators

You’ve already heard about async in C# 5.  This new feature is going to improve UI responsiveness and developer productivity simultaneously, to say the least.  I’ll assume that you’re also familiar with Task<T>, the generic type in the .NET Framework that provides a common model for asynchronous functions, and one of the types on which the async feature depends.

Task<T> has a limitation: it only represents a scalar value T.  This is fine for async methods that only compute a single value, but what if we need to write an async method that lazily computes a sequence of T?  In other words, the entire operation is asynchronous just like Task<T>, but instead of computing a single value we want to potentially compute many values, sequentially, while also being able to await other asynchronous operations.

The best we can do with Task<T> is to return Task<IList<T>>, gathering each T into a list.  There’s no way to yield each individual T as they are computed when using Task<IList<T>>.

The following C# console application illustrates this behavior.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class TaskExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            var list = await AsyncScalar();

            Output.WriteLine("Async Completed");

            foreach (var value in list)
            {
                Output.WriteLine(value);
            }

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static async Task<IList<int>> AsyncScalar()
        {
            Output.WriteLine("AsyncScalar Called");

            var list = new List<int>();

            for (int i = 0; i < 3; i++)
            {
                Output.WriteLine("Computing " + i);

                await Task.Delay(TimeSpan.FromSeconds(1));

                list.Add(i);
            }

            return list;
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): AsyncScalar Called
(Thread 1): Computing 0
(Thread 4): Computing 1
(Thread 4): Computing 2
(Thread 4): Async Completed
(Thread 4): Value = 0
(Thread 4): Value = 1
(Thread 4): Value = 2
(Thread 4): Done

Note that the Output class is also used by subsequent code snippets in this post, but I won’t be repeating it again.  Here’s the definition:

using System;
using System.Threading;

namespace ConsoleApplication1
{
    static class Output
    {
        public static void WriteLine(string message)
        {
            Console.WriteLine("(Thread {0}): {1}", Thread.CurrentThread.ManagedThreadId, message);
        }

        public static void WriteLine(int data)
        {
            WriteLine("Value = " + data);
        }
    }
}

Notice that the AsyncScalar method in the previous example computes values asynchronously, but they’re gathered into a list that is eventually returned when the computation completes.  The caller awaiting the task gets back a pre-computed list of data (referred to as a hot sequence), which can only be iterated synchronously.

Ultimately, our goal is to somehow replace list.Add with something like yield.  We want to be able to intermix awaiting and yielding in an imperative-style method that supports control flow statements, such as for loops.  And the result must be an asynchronous sequence, which doesn’t require synchronous iteration.

async IEnumerable<T> = Error

C# doesn’t allow us to apply the async keyword to iterator blocks.  To use the async keyword on a method it must return void, Task or Task<T>.  To define an iterator block, we must return IEnumerable<T>.  Clearly these features aren’t compatible based on return type alone.

But why not support IEnumerable<T> as another possible return type for async methods?

The answer (if I may assume what the C# team was thinking) is that Task<T> represents an asynchronous function, which means that it provides the necessary model for awaiting a function’s return value.   IEnumerable<T> only represents an interactive sequence, which means that it doesn’t provide a model for awaiting an iterator block’s return value(s).  Instead, values must be pulled synchronously from the sequence.  As a result, async iterator blocks would always be forced to execute synchronously, defeating the purpose of async.

Ok, so how about Task<IEnumerable<T>>?

Nope.  It’s similar to my first code example above, which uses Task<IList<T>>.  It models an asynchronous function that eventually returns a sequence, but that sequence remains synchronous.  Recall that Task<T> represents a scalar-valued asynchronous function, which means that the C# compiler can’t do any tricks with an iterator block to convert Task<T> into something that represents an asynchronous sequence.  The best it can do is allow us to asynchronously invoke an iterator block, but the generated sequence must be enumerated synchronously, as shown in the previous example.  That’s not our goal at all.

Then how about IEnumerable<Task<T>>?

This might seem correct at first, but actually it doesn’t meet our requirements either.  It represents a synchronous sequence of asynchronous functions.  Interesting, but not what we want.  Let’s assume for a moment that the C# compiler transforms the code between yield return statements into Task<T> objects.  We could then write something like this:

// Doesn't compile
static async IEnumerable<Task<int>> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

But consider the behavior when consuming it.  We’d probably use a foreach loop, which calls GetEnumerator to get an IEnumerator<Task<int>>.  Then we’d call MoveNext, which would synchronously enter the for loop and hit the first await.  At this point, we’d expect MoveNext to return true and set Current to a Task<int>.  When the Task<int> completes it should execute the current continuation, which is the code following the await.  So then what happens if we ignore the Task<int> and call MoveNext again?  Perhaps it could block until the current Task<int> completes, but that behavior doesn’t meet our requirements.  We don’t want to block while iterating the sequence.  Furthermore, what happens if when Task<int> completes we don’t call MoveNext right away?  The iterator could begin computing the next Task<int> asynchronously, which means that it would have to buffer the sequence until our synchronous iteration catches up.  This is like the producer/consumer pattern with an implicit queue.  Again, it’s interesting but not what we want.  When a value is computed, we want our continuation to execute right away, without buffering.

So what would happen if we reversed it?  MoveNext returns Task<bool> and Current returns T.

Well that can’t be modeled with any combination of IEnumerable<T> and Task<T>, but if it existed it would allow us to await between yields, keeping the entire computation asynchronous and its iteration asynchronous as well.

IAsyncEnumerable<T>

In the Ix Experimental release (Ix_Experimental-Async on NuGet), there’s a type named IAsyncEnumerable<T>.  It has a single method named GetEnumerator that returns an IAsyncEnumerator<T>, which returns a Task<bool> from MoveNext and a T from Current.  Can we use it to write an asynchronous iterator block?  Let’s take a look at an example.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class AsyncEnumerableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            IAsyncEnumerable<int> sequence = DelayedSequence().ToAsyncEnumerable();

            Output.WriteLine("Awaiting");

            await sequence.ForEachAsync(Output.WriteLine);

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static IEnumerable<int> DelayedSequence()
        {
            Output.WriteLine("DelayedSequence Called");

            for (int i = 0; i < 3; i++)
            {
                Task.Delay(TimeSpan.FromSeconds(1)).Wait();

                Output.WriteLine("Yielding " + i);

                yield return i;
            }
        }
    }
}

 

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Awaiting
(Thread 4): DelayedSequence Called
(Thread 4): Yielding 0
(Thread 4): Value = 0
(Thread 4): Yielding 1
(Thread 4): Value = 1
(Thread 4): Yielding 2
(Thread 4): Value = 2
(Thread 4): Done

At first glance it looks like we’re closer to our goal.  At least we can iterate the sequence asynchronously and values are computed asynchronously.  The ForEachAsync extension is provided as a convenience because the foreach statement can’t handle a MoveNext method that returns Task<bool>, but you could instead call GetEnumerator and loop over a call to await MoveNext manually, if you prefer.

So we’re fine on the consumer side, but what about the producer?  Unfortunately, in order to introduce a delay we must block the current thread (note the call to Wait).  That’s because the iterator block returns IEnumerable<T>, which as we already know doesn’t model an asynchronous sequence.  It seems that IAsyncEnumerable<T> creates a Task<T> for each call to MoveNext to make our sequence consumable in an asynchronous fashion.  This means that our iterator block can’t control its own asynchrony.  Concurrency is injected for us automatically when the consumer decides that it’s ready to await the next value.  So essentially it’s still a pull model even though it’s asynchronous.

And of course we can’t simply add the async keyword to our iterator block, as discussed previously.  We’re now back where we started.

We need a better model for our asynchronous sequence.  A type that represents an asynchronous function like Task<T> as well as a sequence of values like IEnumerable<T>, but in a push-based fashion.  Then we can define out iterator block in terms of this new type by yielding values as they are computed asynchronously.

IObservable<T>

IObservable<T> and IEnumerable<T> overlap in that they both provide a lazily-computed sequence of values; however, they differ in that IObservable<T> represents an asynchronous (push) sequence while IEnumerable<T> represents a synchronous (pull) sequence.  A sequence that pushes values enables reactive computations, while a sequence from which values are pulled enables interactive computations.  LINQ provides a declarative abstraction over any computation.  Reactive Extensions for .NET (Rx) provides LINQ operators for IObservable<T> similar to the LINQ to Objects operators that require IEnumerable<T>.  If you’re not familiar with Rx yet, then you may want to get started with this blog post.

Now back to our problem at hand.  How can we write an iterator block with IObservable<T>?  Does the following work?

// Doesn't compile
static async IObservable<int> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

Unfortunately, no.  But wouldn’t it be nice?  Conceptually, it meets our requirements.  We can subscribe to the observable to receive values asynchronously.  The iterator block can define the observable by awaiting other asynchronous operations and yielding values as they are computed, sequentially yet asynchronously.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described in the remainder of this blog post.  It enables you to write code that looks very similar to the previous hypothetical example.  For more information, read this forum thread.]

Well the Rx team has already thought about this limitation in C# and provided a solution.  The Rx Experimental release (Rx_Experimental-Main on NuGet) provides several overloads of the Observable.Create method, one of which has the following signature:

public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod);

This method allows us to generate an observable using a normal iterator block.  But since we can’t mark the iterator block as async, we can’t await other asynchronous operations.  Instead, we’ll yield observables to await them.  But if we’re yielding observables, then how do we also yield computed values?  That’s where the IObserver<T> object comes in handy.  Here’s an example:

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class ObservableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start();

            Console.ReadKey();
        }

        static void Start()
        {
            Output.WriteLine("Starting");

            var xs = Observable.Create<int>(observer => AsyncSequence(observer));

            Output.WriteLine("Subscribing");

            xs.Subscribe(Output.WriteLine, () => Output.WriteLine("Done"));
        }

        static IEnumerable<IObservable<object>> AsyncSequence(IObserver<int> observer)
        {
            Output.WriteLine("AsyncSequence Called");

            for (int i = 0; i < 3; i++)
            {
                yield return Task.Delay(TimeSpan.FromSeconds(1)).ToObservable().UpCast();

                Output.WriteLine("Pushing " + i);

                observer.OnNext(i);
            }
        }
    }

    static class ObservableExtensions
    {
        public static IObservable<object> UpCast<T>(this IObservable<T> source)
        {
            return source.Select(value => (object) value);
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Subscribing
(Thread 1): AsyncSequence Called
(Thread 5): Pushing 0
(Thread 5): Value = 0
(Thread 4): Pushing 1
(Thread 4): Value = 1
(Thread 6): Pushing 2
(Thread 6): Value = 2
(Thread 6): Done

This behavior is exactly what we wanted: asynchronous subscribe, observations of values as they are generated, the ability to await other asynchronous operations and the use of normal control flow statements, such as for.

However, the implementation is a bit strange.  We’ve replaced list.Add from the first example with observer.OnNext instead of yield, which isn’t so bad, but then we also had to replace await from the first example with yield.  Not very intuitive, perhaps.

Maybe in the future we’ll get first-class compiler support for writing observable iterators.  Until then, this should do fine.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described above.  It enables you to write code that looks very similar to what first-class compiler support might look like, although it requires the use of a callback.  For more information, read this forum thread.]

Side note:

C# doesn’t support anonymous iterator blocks.  Here’s something else interesting: VB 5 does!

Do you agree that anonymous observable iterators is an apt use case for such a feature in C#?

Tags:

Async | C# 5 | Rx

Comments (3) -

Marek Gibek Poland
8/16/2012 2:09:15 AM #

Excellent post, Dave!

Maybe this is something different, but watching movies from NDC2012 I have found that there is .NET language called Nemerle which allows you to write macros (extend language with custom syntax and keywords). I was so fascinated that I wrote mine implementation of async/await keywords (which wasn't a part of language). I did it exactly how it is done in C# 5.0 (using Task). Now I think it could be done much better with IObservable<T> (still it can be done in Nemerle). You can introduce any syntax you like. How I wish C# had the same power.

Louis-Pierre Beaumont Canada
2/24/2013 7:55:35 PM #

Hello Dave,

Great article. I'm currently looking at porting a coroutine class I made  to RX, and the solution you pointed fits perfectly for this.

However I cannot get it working using RX 2. It looks like the Create overload did not make it from experimental to V2.
public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod);

Do to know if there is an equivalent functionality in RX 2?

Hae a nice day! :)

Dave United States
3/2/2013 10:35:42 PM #

Hi Louis,

Async/await in C# 5.0 is actually a coroutine implementation.  The recommended approach in Rx 2.0 is to use the overload of Create that accepts a Task-returning function; i.e., you can use async/await to implement an observable coroutine by passing an async lambda as the subscribe function of the Observable.Create method.  See this discussion for an example:
social.msdn.microsoft.com/.../6e0ead42-4d89-429d-b6a8-d422cf673390

Pingbacks and trackbacks (2)+

Add comment