April 19, 2017

Monads, and Comonads, and LINQ! Oh, my!

Programming through the lens of mathematics offers new ways to solve old problems and old ways to solve new problems. Mathematics is full of surprising relations and dualities.

In this blog post, we’ll journey through an alternate reality in which an OOP language like C# becomes more abstract and expressive by adopting some functional programming concepts, rooted in a branch of mathematics called Category Theory.

Over The Rainbow

Our journey begins with a few familiar types in a probably unfamiliar categorization.

Sync

Async

Monad

IEnumerable<T>

IObservable<T>

Comonad

Lazy<T>

Task<T>

Don’t worry if you don’t understand these concepts yet, we’ll meet them again later...

IEnumerable<T> is the list monad.

IObservable<T> is the continuation monad.

Task<T> is the continuation comonad.

Lazy<T> is the singleton comonad (for lack of a better name).

These types appear to have different purposes, yet there is actually a pattern hiding in there somewhere. The pattern can be stated in structural terms as follows.

Monads have bind and unit functions.

Comonads have cobind and extract functions.

Bind

Unit

IEnumerable<T>

SelectMany; a.k.a., flatMap

Constructors of Array, List<T>, etc.

IObservable<T>

SelectMany; a.k.a., flatMap

Observable.Return; a.k.a., just


Cobind

Extract

Task<T>

ContinueWith; a.k.a., then

get_Result

(a property named Result)

Lazy<T>

?

get_Value

(a property named Value)

You may have noticed that Lazy<T> is incomplete. Can we define a cobind function for Lazy<T>? What would it look like? What would it do?

And what’s the relationship between monads and comonads? Is there a pattern?

Perhaps if we were to examine these monadic and comonadic types through a lens of abstraction, then we could identify the pattern and figure out how to implement cobind for Lazy<T>.

“Professor Marvel Never Guesses”

Let's start by examining the bind function for IEnumerable<T>. Here’s its signature:

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)

So the bind function takes an instance of the monadic type for which it's defined, in this case IEnumerable<T>, and a function that projects the element type, T, into another instance of the monadic type, IEnumerable<R>.

Its purpose may not be clear yet, so we’ll just try and implement it by heeding Erik Meijer’s advice: “let the types guide you”.

Well, we have a selector function that accepts a T, but we don’t have an instance of T yet. Since the source is a sequence, it seems that we must invoke the selector function for each T that it contains. So let's start there. We’ll do foreach over source and apply selector to each value.

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)
{
foreach (var value in source)
{
var results = selector(value);

// ...
}
}

Great, we're done with the Select (a.k.a., map) part. Now let’s check the return value.

The selector returns an IEnumerable<R> for a given T, and we need to return a single IEnumerable<R>. We can’t simply return the IEnumerable<R> for the first T, because we must return them all. But how do we convert many IEnumerable<R> into a single IEnumerable<R>?

How about we iterate each of the sequences and yield all of the results?

(An Iterator Block makes this easy in C# – it spares us from having to implement IEnumerable<T> and IEnumerator<T> ourselves.)

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)
{
foreach (var value in source)
foreach (var innerValue in selector(value))
yield return innerValue;
}

And now we're done with the Many (a.k.a., flat) part.

We've just implemented the bind function in the list monad by iterating over the source sequence, mapping each element into a nested sequence, and flattening all of the nested sequences into the output sequence. The name flatMap makes perfect sense! The name SelectMany makes sense too, although it’s not a typical naming convention.1

The significance of this implementation may not be clear yet, but one thing is certain: it was easy to derive this implementation by following the types.

It’s A Twister! It’s A Twister!

Shouldn’t all monads have similar signatures for their SelectMany implementations?

Let's look at bind for IObservable<T>. And for comparison, IEnumerable<T> as well.

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)

Okay, so there's clearly a pattern here. It’s the same exact signature, just with different generic types.

How can we implement bind for IObservable<T>?

Since the source is a sequence, it seems that we must invoke the selector function for each T that it contains. And there's only one way to get T's out of an observable: Subscribe

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)
=> source.Subscribe(value =>
{
var results = selector(value);

// ...
});

We're done with the Select (a.k.a., map) part. Now let’s check the return value.

The selector returns an IObservable<R> for a given T, and we need to return a single IObservable<R>. Again, we’re in a situation similar to IEnumerable<T> where we have many instances of the monadic type yet the return type requires a single instance.

To convert many IObservable<T> into a single IObservable<T>, how about we Subscribe to all of the sequences and push all of their values to the observer? It’s a similar concept to the inner foreach that flattens the IEnumerable<T>.

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)
=> source.Subscribe(value => selector(value).Subscribe( ? ));

Uh oh, from where do we get an observer to pass to Subscribe? And how do we return an IObservable<R> from this expression? Subscribe returns IDisposable, so that's not helpful.

What we need is an implementation of IObservable<T> that we can return from SelectMany, so that when Subscribe is called with an IObserver<T>, that observer is available to our expression. Then we can pass the observer to the projected observables' subscriptions.

If this seems backwards, it’s because it is. The reactive IObservable<T> interface is dual to the interactive IEnumerable<T> interface. They are structural opposites, so we must think oppositely when solving structural problems like this one.

IEnumerable<T> is synchronous because the caller of IEnumerator<T>.MoveNext blocks while the next value is computed. IObservable<T> is asynchronous because after Subscribe is called with an IObserver<T>, the caller is not necessarily blocked and may continue executing. OnNext is called when values are computed in the future.

Using the same observer to subscribe to each of the projected sequences is similar to yielding from all of the projected enumerables in our IEnumerable<T> implementation above; however, we don't have a language integrated feature like Iterator Blocks, so we’ll have to use Rx's Observable.Create function instead. The Observable.Create function takes a function that has an IObserver<T> argument, which we’ll use to make the inner subscriptions.

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)
=> Observable.Create<R>(observer => source.Subscribe(value => selector(value).Subscribe(observer)));

And now we’re done with the Many (a.k.a., flat) part.

Note: I’ve avoided addressing the following issues in the code example above for the sake of simplicity – the focus here is on the essence of the monadic operations, not the implementation details of how to properly implement operators.

  1. This implementation violates one of Rx's contracts: Serialized notifications per observer (§4.2 in the Rx Design Guidelines). If any of the inner observables push notifications on different threads, then a downstream observer may receive overlapping notifications, which is very bad. I'm not including a proper implementation in this blog post, but we can imagine that synchronizing calls to OnNext within a lock would suffice.
  2. This implementation fails to properly support cancellation since we’re ignoring the IDisposable object that Subscribe returns. A proper implementation must hold the disposable for each inner observable, until it terminates. Furthermore, the inner disposables must be tied to the lifetime of the outer disposable, such that if the outer disposable is disposed, then all of the inner subscriptions are cancelled as well.
  3. This implementation fails to properly handle OnError and OnCompleted notifications from the source observable. Furthermore, it improperly handles OnCompleted notifications from the inner observables by completing the sequence as soon as one of the inner observables completes, rather than completing when all observables, including the source, have completed.

The implementation of bind for IObservable<T>, like bind for IEnumerable<T>, is a flattening and a mapping; however, due to the natural asynchrony of the IObservable<T> interface, bind provides an asynchronous merging rather than a synchronous flattening.

In other words, the SelectMany operator for IObservable<T> is actually Select + Merge, whereas SelectMany for IEnumerable<T> is actually Select + Concat.

The bind operator for IEnumerable<T> accepts a sequence as input. For each value, an inner sequence is projected and iterated. Due to the synchronous nature of IEnumerable<T>, only one inner sequence can be iterated at a time. The bind operator will not yield any values from subsequent projections until it has completely iterated over the current inner sequence.

Conversely, the bind operator for IObservable<T> merges all inner subscriptions together, such that when any inner subscription produces a value, it’s immediately pushed downstream.

In both implementations of bind, the inner sequences are being flattened down into a single output sequence.

We’re Not In Kansas Anymore

Language Integrated Query (LINQ) introduces the monad as a first-class citizen in C#. Query Comprehension Syntax is a syntax for monads.2

They are monads because, other than satisfying the monad laws, each monad is a generic type with a specific purpose, paired with a bind function that implements sequential composition over that type, while preserving its purpose.

We can understand how bind represents sequential composition by following this logic:

  1. The bind function SelectMany(M<T> source, Func<T, M<R>> selector)has two parameters.
  2. The source parameter is of the type M<T>. Think of M as the monadic type, which contains values of type T. Monadic types are generic types. (This is what is meant by “amplification” or “embellishment”.)
  3. The selector function projects a value of type T into M<R>, which is the monadic type M containing values of type R.
  4. M<T> represents a computation of values of type T. More precisely, M<T> represents an antecedent computation, because a value of T must be computed before it can be passed to selector.
  5. The selector function represents a subsequent computation, because it accepts a value of type T and returns M<R>. And just like M<T>, M<R> represents a computation of values of type R.
  6. Both M<T> and M<R> are of the monadic type M, although they may contain different types. M<R> can be used as the source parameter in a subsequent bind operation.
  7. The bind operator can be chained together across may projections over the monadic type M, such that M<T> goes to M<R> goes to M<Q>, and so on. Each projection through bind is a linear step in a larger computation.
  8. Therefore, the bind function sequentially composes antecedent computations with subsequent computations, while preserving the purpose of the monadic type.
  9. However, the cardinality of M may be greater than 1, depending on the actual type of M. Containing more than one T means having more than one M<R> because each T will be projected into an instance of M<R>.
  10. The bind operator flattens M<M<R>> into a single M<R> to ensure that its result is a single container of the monadic type M. Flattening preserves sequential composition regardless of the cardinality of M.

For example, IEnumerable<T> represents a lazily-computed, synchronous sequence of values of type T. That’s its purpose, which SelectMany must preserve. As the source for SelectMany, IEnumerable<T> represents an antecedent computation of values of type T, because each T must be computed before it can be projected. SelectMany iterates the source, invoking the selector function for each value of type T, and projecting it into IEnumerable<R>. IEnumerable<R> represents a subsequent computation, because it can be subsequently iterated to generate values of type R. To continue the computation sequentially, IEnumerable<R> can be used as the source for another call to SelectMany. Therefore, SelectMany implements sequential composition.

IObservable<T> represents a lazily-computed, asynchronous sequence of values of type T. It’s similar to IEnumerable<T>, except asynchronous. And how do we consume an asynchronous sequence? Not by foreach, but by passing in a continuation (IObserver<T>). The SelectMany operator for IObservable<T> subscribes to the source with its own IObserver<T>. Sequential composition is implemented by awaiting values pushed into the IObserver<T> and then pushing them as input into a subsequent computation, represented by IObservable<R>. As far as sequential composition goes, it doesn’t actually matter that IObservable<T> is asynchronous, because going from T to R remains sequential.

IObservable<T> is asynchronous because in order to get data out of it, you must pass in a function (a continuation) to be invoked later. It’s the so-called, “continuation passing style”, or CPS for short. Thus, IObservable<T> is the continuation monad.

The bind operator enables us to write sequential composition in a declarative style. SelectMany sequentially composes two parts of our query, while hiding the imperative- or CPS-style mechanisms that are required to read values from the monadic type. We can think of a monadic type as a container of values, and bind lets us compose them without having to explicitly specify how to extract their values. We don’t have to use foreach for enumerables or call Subscribe for observables, until we want to leave the monad. That’s why we use foreach and Subscribe only at the ends of our LINQ queries.

This pattern works with more than just sequences. There are many types of monads; e.g., the “Maybe” monad implements null propagation, as Wes Dyer describes here, while introducing monads. There’s a State monad that represents mutability in a pure functional language like Haskell, an I/O monad that represents input and output, also in Haskell, there’s the Reader monad for computing within an environment, similar to having global or static fields (you guessed it: Haskell), among others.

Monads provide a declarative programming model, but note the relation to imperative-style code. Two consecutive from clauses in a LINQ query is compiled into a single call to SelectMany. It’s like having a semicolon at the end of each line. The first from expression is evaluated, and then the second from expression is evaluated, with the value of the previous expression still in scope.

from x in Xs //;
from y in getYs(x) //;
select (x, y)

This query looks similar to our implementation of SelectMany for IEnumerable<T>, which consisted of nested foreach loops! And in fact it works exactly the same, because this query simply compiles into a call to SelectMany.

A query is a single expression pieced together by applying a sequence of operators, rather than a series of statements that are pieced together by a sequence of semicolons. It’s the composability of the monad that enables us to write a query as a single expression, and it’s the expression syntax that makes the declarative programming style appear similar to the imperative style.

This style of programming is declarative because we can use operators like where, join and group by without sacrificing the readability of the program with all of those foreach loops that we’d need to write if we weren’t using the list monad, or all of those continuations that we’d need to write if we weren’t using the continuation monad.

Essentially, monads enable us to declare what we want the program to do rather than how the program should do it. The “how” part is hidden by the monad, and it differs between monads.

from x in Xs
where x != null
from y in getYs(x)
join z in Zs on y.P equals z.P
select (x, y, z)

Is the above an interactive query over IEnumerable<T> or a reactive query over IObservable<T>?

Or maybe it’s a hybrid? Perhaps Xs is an IObservable<T> and getYs returns an IEnumerable<T>?

Each is a valid possibility. There’s no way of knowing by looking at the query alone!

The monadic pattern is a generalization on composition with embellishments, so it’s no surprise that queries over different types can look the same; however, some operators don’t make sense in some monads. For example, Rx defines the Sample(TimeSpan) operator for IObservable<T> to sample elements by time, which only makes sense because IObservable<T> is asynchronous. Sampling by time doesn’t make sense in IEnumerable<T> because it’s synchronous; however, enumerables do have the ability to sample elements by data; i.e., the Where operator. We can also sample elements by data in IObservable<T>, thus Rx provides a similar Where operator.

Essentially, a monad consists of an embellishing type and a function, bind, that preserves the embellishment through sequential composition.

A monad must also have a unit function, which enters the monad. For example, Rx provides Observable.Return (a.k.a., just).3

Technically, a monad must also satisfy some simple mathematical laws: left identity, right identity and associativity. I won’t go into them here.

Perhaps what makes monads seem mysterious and confusing is how such a simplistic functional trick can be so powerful as to warrant an entirely new syntax in languages like Haskell and C#. Are monads confusing because they mysteriously turn complex imperative-style code into a neat declarative-style code that looks eerily similar to imperative style? I’m confused even writing that question.

A comprehension syntax isn’t technically required, it just makes monads nicer to use! C# doesn’t have keywords for every useful operator, so most of the queries we write are in the form of fluent-method call syntax, whereby we chain operators together using dot notation and pass lambda expressions as function arguments.

For example, here’s the same query again written in fluent-method call syntax:

Xs.Where(x => x != null)
.SelectMany(x => getYs(x).Select(y => (x: x, y: y)))
.Join(Zs, xy => xy.y.P, z => z.P, (xy, z) => (x: xy.x, y: xy.y, z: z));

It’s not great, although it’s probably how most languages use monads. Regardless, both syntaxes (comprehensions and fluent-method call) form declarative expressions. The comprehension syntax is often nicer though because it hides all of those lambdas and automatically broadens the scope of query variables; e.g., in the query comprehension written earlier, x is still in scope after join. The C# compiler accomplishes this simply by changing the type of T to a compiler-generated type that carries all of the variables in scope. We must do the same trick ourselves when writing in the fluent-method call syntax if we want to keep earlier values in scope. We typically do that by projecting anonymous types or tuples throughout our query, as I’ve done in the example above.

One last interesting thing to note about monads…

Because of its declarative nature, applying bind forms a lazily-evaluated expression, just like monads in lazily-evaluated functional programming languages. SelectMany and other operators do not cause any side effects when they are applied. They simply return a new instance of the monad, so that we may continue applying operators to build our computation. When we are ready for side effects, then we leave the monad and the computation may begin. How it begins depends on the purpose of the monadic type.

IEnumerable<T> requires code to pull each value through the composition, thus we use foreach.

IObservable<T> pushes values through the composition, once we’ve called Subscribe.

And that’s the mysterious monad. You can get deeper into the concept of monads in Category Theory, but I think that the essence of why they’re useful to us as programmers is described above.

All good, but monads are only half the story!

If I Only Had a Codomain

According to my chart at the beginning of this post, Task<T> is a comonad. Why?

If you don’t believe me, then we’ll try and disprove that claim by implementing bind for Task<T>. If we can implement bind, then it’s probably a monad, right?

We already know how the signature for bind must look based on the pattern that we’ve identified above for enumerables and observables. They each have similar signatures. To go from the enumerable bind to the observable bind we must only do a monadic type replacement. So let’s grab the bind signature for IEnumerable<T> or IObservable<T> and simply replace the monadic type with Task<T>:

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)

Looks good, no problems so far.

Since the source is a Task<T>, it seems that we must invoke the selector function with the T that it contains.

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)
=> selector(source.Result);

Okay, that compiles, but it’s totally incorrect! Reading the Result property may block, and blocking isn’t preserving the purpose of Task<T>. The purpose of Task<T> is to represent the asynchronous computation of T. The SelectMany function must allow us to sequentially compose asynchronous computations, which means that we must be able to project a Task<T> into a Task<R> without blocking to read T. Essentially, SelectMany must return a lazily-evaluated expression, just like the implementations for IEnumerable<T> and IObservabe<T>.

Great, so how do we get the T out of the Task<T> asynchronously?

For the C# devs out there, our instinct is to define SelectMany as an async method and then await source. And that may seem appropriate, but clearly await must be doing something under the covers in order to get the T from the Task<T> without blocking. How does it do it? Whatever the C# compiler is doing for await, we must be able to do ourselves…

Well, the only way to get the result asynchronously is to pass a continuation to ContinueWith (a.k.a., then). But what is this strange method anyway? Let’s take a look at its signature (one overload in particular4).

public static Task<R> ContinueWith<T, R>(this Task<T> source, Func<Task<T>, R> selector)

Hmm, now that method looks familiar. Is it bind? Not quite. Here’s bind again:

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)

The type arguments in the selector function are reversed! Well, they are sort of reversed. The T and R type arguments remain in the same positions, but the generic Task type is in the opposite position. The selector function for ContinueWith has a Task parameter, rather than returning a Task.

So what is the selector in ContinueWith supposed to do?

Instead of being given a value and returning a new, embellished value, it’s given an embellished value and it has to return a new, unembellished value. In other words, it has to extract the value from the comonadic type inside the projection! This makes sense if we think of comonads (and monads) as containers in general.

And that’s how to safely (i.e., in a non-blocking manner) extract a value from Task<T>. We must read the Result property only from within the selector (continuation) supplied to ContinueWith, in order to preserve the purpose of Task<T>.

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)
=> source.ContinueWith(task => selector(task.Result)).Unwrap();

Unwrap is also required, because selector returns Task<R>, yet the selector in ContinueWith requires an R. So we make R a Task<R>, thus ContinueWith returns Task<Task<R>>. SelectMany requires Task<R>, so we must apply Unwrap one time to ditch the outer Task.

Recall that bind is a flattening projection (flat map; it’s like going from IEnumerable<IEnumerable<T>> to IEnumerable<T>), whereas here we’re using ContinueWith to compose over an extending projection (Task<R> becomes Task<Task<R>>). The concepts of flattening and extending are opposites. The bind and cobind functions are opposites; therefore, ContinueWith is cobind.

Here, Unwrap is merely the flattening of our bind implementation. It’s how we convert our cobind’s extending projection into bind’s flattening projection.

The Task<T>.Result property (technically, the get_Result method) is the extract function of the Task<T> comonad. The extract function is to comonad as the unit function is to monad. The unit function gets us into the monad, extract gets us out of the comonad. They are opposites!

And now we see why Task<T> is a comonad, and not a monad. Another way of looking at: we can only implement bind in terms of cobind; i.e., we can’t implement SelectMany for Task<T> without calling ContinueWith.

Other than the comonadic laws (analogous to the monad laws), comonads are a triple consisting of a generic type, a cobind function and an extract function. Comonads are the opposite of monads, which are a triple consisting of a generic type, a bind function and a unit function.

As you can see from the code above, it’s fairly easy to convert from cobind to bind. Therefore, we can convert comonads into monads. We can also convert monads into comonads, although I’m not showing it here, but you can imagine that one indicator of a type being monadic rather than comonadic is that when implementing cobind, you’d discover that you must implement it in terms of bind. Try it for yourself.

A major reason that we don’t use Task<T> as a monad is because its comonadic form lends itself naturally to a really useful compiler trick: coroutines. A coroutine is syntactic sugar that allows us to write imperative-style code for continuations, hiding the messy details of the underlying state machine that the compiler must generate. In C#, it’s known as async methods. The await keyword essentially compiles into a call to ContinueWith.

Although we can’t use this compiler trick with monads directly, we can convert a monad into a comonad, and that’s exactly what Rx’s ToTask extension method does. But even better than that, Rx offers an extension method that implements C#’s awaitable pattern for IObservable<T>, thus we can directly await an IObservable<T> to safely (i.e., in a non-blocking manner) extract the final value of the sequence. This works well for singleton observables, especially for aggregations like Sum and ToList.

Because C# has great support for async methods, it’s often recommended to use Task<T> rather than IObservable<T> when your computation only has a single result; however, it appears that for a future release of Rx, Bart De Smet has implemented the new awaitable pattern in C# 7. We’ll be able to define async methods that directly return an observable rather than a Task<T>. This is certainly not a replacement for Task<T> in all async methods, but it may come in handy. To return an observable from an async method, we’ll have to use a new interface, ITaskObservable<T>, which derives from IObservable<T>. It’s just a small grammatical difference, perhaps. One neat thing about ITaskObservable<T> is that it has singleton semantics; i.e., it only calls OnNext once, similar to the fact that Task<T> can only contain a single Result.

Along The Yellow Brick Road…

At the beginning of our journey we had asked the following questions:

“Can we create a cobind function for Lazy<T>? What would it look like? What would it do?”

Along the way we’ve met IEnumerable<T>, IObservable<T> and Task<T>. Now we know how to proceed.

We’ll take the signature for Task<T>’s cobind function and replace all Task<T> with Lazy<T>.

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)

We have a Lazy<T> and a function that requires a Lazy<T>, so should we apply it? (#WWEMD5)

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)
=> selector(source);

Sure, but what about the return value now? We have an R but we require a Lazy<R>.

That’s simple. Let’s make the whole expression lazy by wrapping it in a new Lazy<R>. That makes sense because we must preserve the purpose of Lazy<T>, which is lazy initialization, right?

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)
=> new Lazy<R>(() => selector(source));

When reading Lazy<R>.Value, the selector will first extract the value of Lazy<T>, and with it compute R. In effect, Lazy<T> isn’t evaluated until we attempt to evaluate Lazy<R>, because Lazy<R> depends on Lazy<T>.

Thus, laziness is preserved through composition.

How might our code look when using this ContinueWith implementation for Lazy<T>?

Let’s first define a simple function named Create to take advantage of type inference. It’s just a bit more declarative than having to construct a Lazy<T> ourselves and specify T explicitly.

public static class Lazy
{
public static Lazy<T> Create<T>(Func<T> valueFactory)
=> new Lazy<T>(valueFactory); }

Here’s a simple program that composes two lazy computations:

var i = Lazy.Create(() =>
{
Console.WriteLine("Initializing ‘i’");
return 42;
});

var d = i.ContinueWith(lazy => lazy.Value * 2);

Notice that the construction of Lazy<T> assigned to d is hidden by the cobind function. This is the declarative style of programming, and it appears the same in comonads as in monads.

Using i and d as defined above, what does the following console program print?

Console.WriteLine(i.IsValueCreated);
Console.WriteLine(d.IsValueCreated); Console.WriteLine(d.Value);

Console.WriteLine(i.IsValueCreated);
Console.WriteLine(d.IsValueCreated);
Console.WriteLine(i.Value);

Scroll down to see the output…

.

.

.

.

.

.

False
False
Initializing ‘i’
84
True True
42

Notice that i.IsValueCreated returns False at first, even though d is defined in terms of i. Invoking the ContinueWith function doesn’t cause any side effects, just like how applying SelectMany doesn’t cause any side effects for monads. We can build up a computation with cobind and evaluate it later; i.e., lazy evaluation.

When the program reads d.Value, then i.Value is initialized. Afterwards, i.IsValueCreated returns True, even though the program hasn’t read i.Value directly yet. That’s because d reads i.Value in its continuation, so evaluating d requires i to be evaluated as well. Evaluating the outermost Lazy<T> propagates the act of evaluation up to the source.

Finally, when the program reads i.Value at the end, notice that it’s not reinitialized. Lazy<T> caches the result of its factory function.

We’re Off To See The Wizard

Where does laziness come from? Is it something in the water?

No, it’s something in the function.

A function provides an identifiable name for a sequence of statements in the imperative style. Imperative style programming languages are eager, in that when a program encounters a statement, it’s evaluated immediately. That’s what imperative means. Therefore, laziness is achieved simply by wrapping statements in a function. A function may be evaluated at any time by invoking it, by name or delegate.

A statement evaluates eagerly.

A statement in a function evaluates lazily.

A lambda expression represents an anonymous function with a body consisting of a single statement. We bind a lambda expression to a named variable or parameter, and that becomes its name for the purpose of invoking it. A lambda expression isn’t evaluated at the site at which it’s defined, but at the site at which it’s invoked. Therefore, lambda expressions are lazily-evaluated. They are functions.

In a pure functional language like Haskell, a function consists of expressions, and expressions are interchangeable with values.6 Replacing an expression with its value cannot be done safely in imperative-style languages because an expression may have side effects, and you’d expect those side effects to occur each time that the expression is evaluated. Evaluating a Haskell program is similar to reducing a mathematical expression into its simplest form, yet expressions in Haskell are, perhaps surprisingly, lazily evaluated. Evaluation is deferred until an expression’s value is actually needed by the program. And once an expression has been evaluated, its value may replace every instance of that expression in the program. Perhaps expressions in Haskell are similar to Lazy<T> in C#?

So how is Lazy<T> related to a function?

Well, the Lazy<T> constructor turns a parameterless function into a type

public Lazy(Func<T> valueFactory)

that has a property

public T Value { get; }

which is technically a parameterless function

public T get_Value()

that has the same exact signature as valueFactory, the Func<T> that was used to construct the Lazy<T> instance in the first place.

So can we shake off the Lazy<T> type and implement Func<T> as a comonad?

Let��s try it. We’ll start with the signature of ContinueWith by simply replacing all Lazy<T> with Func<T>.

public static Func<R> ContinueWith<T, R>(this Func<T> source, Func<Func<T>, R> selector)

That’s wonderfully weird and magical looking. And for our next trick, the types will guide us…

We have a Func<T>, and we have a function that takes a Func<T> as its argument.

Note: A function with a function parameter is known as a higher order function. In this case, we’ve got two higher order functions: ContinueWith and selector.

Functions are given data as arguments.

Higher order functions are given functions as data.

Let’s pass our source function to selector.

public static Func<R> ContinueWith<T, R>(this Func<T> source, Func<Func<T>, R> selector)
=> selector(source);

Just like ContinueWith for Lazy<T>, we notice that our return types don’t match. The selector function returns R, but we need a Func<R>.

We must wrap the entire expression in another function!

public static Func<R> ContinueWith<T, R>(this Func<T> source, Func<Func<T>, R> selector)
=> () => selector(source);

Well, that was easy. It looks the same as the implementation for Lazy<T>, but without a Lazy<R> wrapper. The outer function provides the laziness.

For comparison, here’s the Lazy<T> definition from above:

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)
=> new Lazy<R>(() => selector(source));

Is the Func<T> comonad the same as the Lazy<T> comonad?

No, because Func<T> will always compute a value when you invoke it, whereas Lazy<T> only computes its value once. Furthermore, Func<T> may return a different value each time that it’s invoked (in a side-effecting language like C#, this is common.)

Therefore, Func<T> is lazy, but it may also cause side effects each time that you invoke it, whereas Lazy<T> may only cause side effects once.

Should we use the Func<T> comonad at all?

Probably not. It offers no real advantage compared to composing lambda expressions, although it does hide the closure (for better or worse). Perhaps Func<T> is the closure comonad? A blog post for another day.

Meet The Lazy Wizard

The laziness of Lazy<T> is related to the idea that applying co/bind forms a lazily-evaluated expression.

We can declare computations over IEnumerable<T> and IObservable<T> that are lazy; i.e., there are no computational side effects until we iterate or subscribe, respectively. And likewise for Task<T>, we can compose together many ContinueWith calls without causing any side effects; that is, until the Task<T> completes. There’s obviously a race condition here, but the point is that ContinueWith doesn’t block. ContinueWith forms a lazily-evaluated computation, although it’s possible that Task<T>.Result was already computed before ContinueWith was applied, which may result in the continuation executing synchronously.7

And so it goes for Lazy<T>. The purpose of Lazy<T> is simply lazy evaluation. That’s what ContinueWith needs to preserve across compositions; although, it’s possible that the lazy value may have already been computed by the time the program attempts to extract it. Just like Task<T>.Result, once it’s computed, we can keep reading Lazy<T>.Value over and over again without recomputing it. We can even build new computations on top. In the case of Lazy<T>, a new computation over a pre-computed value would simply reuse the pre-computed value as the source for the new computation, without having to reevaluate the original source; i.e., memoization of a parameterless function. This is illustrated in the code example above with the explicit call to i.Value at the end of the program: the side effect of writing to the console upon initializing i.Value had already occurred earlier in the program, and it’s not repeated again at the end.

All of these co/monadic types are lazy, but they also differ in behavior when it comes to laziness. The laziness of Lazy<T> and Task<T> isn’t permanent, it’s a one-time use. The laziness of IObservable<T> and IEnumerable<T> is potentially repeatable.

To put it another way: Lazy<T> and Task<T> cache their value to avoid duplicating side effects, whereas IObservable<T> and IEnumerable<T> may cause side effects each time that they are “invoked”. I smell a pattern.

Pulling Back The Curtain

Lurking in the shadows, as usual, are side effects. Being able to control side effects is a major benefit of having these co/monadic constructs in C#.

The point of Lazy<T> is to ensure that any side effects only occur once, when computing a single value of T. That’s why Lazy<T> is often used to represent a static singleton in C#. If computing the value causes side effects, then we don’t want to compute the value until it’s actually needed by the program. And once it’s been computed, we don’t want to repeat the computation. Of course, there are other reasons to have singletons, but using a static readonly field without Lazy<T> is generally good enough for most singletons. It’s only when we want to defer side effects that we use Lazy<T> instead.

A synchronous sequence of values is similar to Lazy<T> but with a larger cardinality: Lazy<T> only has a single Value, whereas IEnumerable<T> has many. But there’s another significant difference: IEnumerable<T> doesn’t necessarily cache its results. Each time that you iterate, you may get different values. Furthermore, an enumerable query is a computation that may have its own side effects regardless of whether the source IEnumerable<T> caches its results or not, but those side effects aren’t realized until you iterate it.

Task<T> represents an asynchronous function that eventually computes a Result. We use Task<T> to avoid duplicating the side effects of an asynchronous computation. Once it completes, subsequent reads of the Result property will always return the same value, thus side effects won’t occur again. If we actually want to repeat the side effects, then we must invoke the async function again, which will return a new instance of Task<T>.

An asynchronous sequence of values is similar to Task<T> but with a larger cardinality: Task<T> only has a single Result, whereas IObservable<T> has many. But there’s another significant difference: IObservable<T> doesn’t necessarily cache its results. Each time that you Subscribe, you may get different notifications. Furthermore, an observable query is a computation that may have its own side effects regardless of whether the source IObservable<T> caches its notifications or not, but those side effects aren’t realized until you Subscribe.

Side effects are related to temperature:

Cold means that side effects may occur.

Hot means that side effects do not occur.

So how does the laziness of co/monads relate to temperature?

Let’s consider the properties of the types of the co/monads separately from their co/bind implementations.

The Func<T> type may return a different value each time that it’s invoked, and that’s the cold programming model. Therefore, functions are cold.

The Func<T> comonad described above defers side effects until the outer-most function is invoked. That’s also the cold programming model, and essentially its purpose. The Func<T> comonad composes functions lazily. But it’s also pointless, given that Func<T> is already cold. The Func<T> comonad simply adds an unnecessary layer of cold indirection.

The Lazy<T> type is initially cold, because it defers side effects until its Value property is read, yet it becomes hot for subsequent reads of the Value property. In other words, the first time that Value is read it may cause side effects, but subsequent reads definitely will not. (There’s also a race condition if Value may be read by multiple threads simultaneously, but that's similar to Task<T>.Result, so let’s address the race condition in our analysis of Task<T> that follows.)

The Lazy<T> comonad is initially cold, because cobind defers side effects until the outer-most Value property is read, yet it becomes hot for subsequent reads of the outer-most Value property.

The Task<T> type is initially cold, because it can be constructed by passing in a function and it defers side effects until its Start method is called; however, it can only be executed once. Calling Start again throws an exception. Furthermore, the Task<T>.Result property is always hot; therefore, Task<T> transitions to hot when Start is called.

However, it’s rare to acquire a Task<T> that hasn’t already been started. Typically we use Task.Run, Task.Factory.StartNew, or invoke some BCL or user-defined function that returns a Task<T>. In these cases, Start has already been invoked, thus the computation may have already begun even before the caller acquires a reference to the Task<T>. The caller can’t invoke Start again, and reading the Result property never causes side effects, thus our typical usage of Task<T> is hot, yet the Task<T> type itself is technically cold. In other words, notice that we typically acquire an instance of Task<T> by invoking a function that returns it. Functions are cold, as stated previously. Therefore, each time that we invoke a function that performs an asynchronous computation, it may return a new instance of Task<T> to represent its Result, which is always hot. Invoking the function is essentially like invoking the Start method yourself. In that case, Task<T> is hot because it has already been started by the function that returned it, yet the Task<T> type itself is technically cold because it must be started at some point.

Alternatively, there’s another way to get an instance of Task<T> (a.k.a., “future”): use a TaskCompletionSource<T> (a.k.a., “promise”). TaskCompletionSource<T> replaces the need to call Start by essentially making the Result property assignable. The Task<T> that it returns is hot because it’s constructed without a function (via an internal constructor), thus it has no computation to be deferred. A consumer of the Task<T> is unable to call Start, thus it’s unable to cause side effects, and so this usage of Task<T> is hot. This doesn’t change the fact that the Task<T> type itself is cold, at least as far as its public API is concerned. The promise mechanism is useful because it decouples the two features of a future: It separates its ability to defer a computation from its ability to compute asynchronously; i.e., TaskCompletionSource<T> makes Task<T> hot without having to begin a computation by calling Start. Instead, the computation occurs externally and terminates when Result is assigned (or the task faults, or it’s canceled – but we’ll ignore these states for the purposes of this analysis).

The Task<T> comonad is initially cold, because cobind defers side effects until the Task<T> completes. It’s cold because the continuation is a function, and functions are cold. However, it becomes hot once the Task<T> completes. If you apply ContinueWith after the Task<T> has completed, then it may execute the continuation synchronously. There’s a race condition.

Therefore, the Lazy<T> and Task<T> types have the same temperature and similar behavior; i.e., they are both initially cold and then transition to hot by caching their value, which avoids duplicating side effects. Lazy<T>.Value blocks until its computation has completed; likewise, Task<T>.Result blocks until its computation has completed. The difference between them is that Lazy<T> computes its Value synchronously, and Task<T> computes its Result asynchronously. This means that to extract the Value from Lazy<T>, a program must block. To extract the Result from Task<T>, a program can do other things while Result is being computed. Since the only purpose of Lazy<T> is to defer its computation of Value, it doesn’t make sense to have a method like Run; it would simply block until Value has been computed, thus defeating the purpose of Lazy<T>. Since the purpose of Task<T> is asynchrony, in addition to deferring its computation of Result, it’s actually useful to hide its coldness behind a function that returns a hot Task<T>.

As comonads, both Lazy<T> and Task<T> are initially cold, yet they become hot once their value has been computed. In the case of Lazy<T>, cobind is synchronous (it blocks until Value is computed), whereas for Task<T>, cobind is asynchronous (it doesn’t block, although it may execute the continuation synchronously when Result has already been computed and the ExecuteSynchronously flag was set). In either case, the transition from cold to hot means that there’s a race condition.

Therefore, comonads and their comonadic types are cold, which is the definition of deferred execution. In addition, comonads that transition from cold to hot have a race condition.

The IEnumerable<T> type is either cold, as in C# Iterator Blocks, or it’s hot, as in pre-computed lists and dictionaries.

The IEnumerable<T> monad is cold, because bind defers side effects until the enumerable that it returns is iterated. This is true even if the IEnumerable<T> that is the source of the computation is hot. Although no side effects will occur when iterating a hot enumerable such as a List<T>, the computation itself (a.k.a., the query) may cause side effects when it’s iterated, and it can only cause side effects when it’s iterated.

The IObservable<T> type is either cold, as in Rx generators like Return, Range, Generate, Timer, Interval and Create, or it’s hot, as in Subject<T>, events and hot Task<T> conversions.

The IObservable<T> monad is cold, because bind defers side effects until an observer subscribes to the observable that it returns. This is true even if the IObservable<T> that is the source of the computation is hot. Although no side effects will occur when subscribing to a hot observable such as a Subject<T>, the computation itself (a.k.a., the query) may cause side effects when an observer subscribes, and it can only cause side effects when an observer subscribes. Furthermore, due to the asynchrony of IObservable<T>, some interesting and useful combinations of cold and hot temperatures are also possible. (See Hot and Cold Observables for details.)

Therefore, the IEnumerable<T> and IObservable<T> types can either be cold or hot, and they have similar behaviors; i.e., they both represent sequences, which may or may not cause side effects when computing their elements. The difference between them is asynchrony. IEnumerable<T> computes its elements synchronously. IObservable<T> computes its elements asynchronously. This means that to pull the next value from an IEnumerable<T>, a program must block. To be pushed values from an IObservable<T>, a program calls Subscribe and can immediately do other things. In either case, iterating an IEnumerable<T> or subscribing to an IObservable<T> doesn’t necessarily cause side effects. It depends on the implementation; e.g., List<T> does not cause side effects, so it’s hot. An Iterator Block may cause side effects, to it’s cold. Observable.FromEventPattern returns an observable that wraps an event, which is hot. Observable.Range generates notifications each time that you subscribe, so it’s cold.

As monads, both IEnumerable<T> and IObservable<T> are initially cold. Applying bind on an IEnumerable<T> or IObservable<T> won’t cause any side effects until it’s iterated or subscribed, respectively. However, depending upon the implementation, they may or may not transition to hot.

Therefore, monads are cold regardless of the temperature of their types, which is the definition of deferred execution. In addition, monads that transition from cold to hot have a race condition.

Monads and comonads have identical temperatures with regard to composition – they are both cold. Therefore, consecutively applying co/monadic operators is free of side effects. Co/bind implements deferred execution, such that side effects won’t occur until you leave the co/monad.

There’s No Place Like Home

We use monads and comonads to control side effects. It’s the purpose of the property of laziness in co/bind. Co/monads allow us to write queries declaratively, without causing side effects. We control when side effects may occur by reading/invoking/iterating/subscribing at a later time.

Lazy<T> is a comonad with the purpose of causing side effects only once, attached to a blocking computation. The Func<T> comonad is even closer to the metal in that it represents repeatable side effects; although, perhaps it’s not very useful as a comonad.

Task<T> is a comonad with the purpose of causing side effects only once, attached to an asynchronous computation. However, the laziness of Task<T> is subtle. Due to a race condition that is common in the Task<T> implementation of cobind, we must either live with side effects possibly occurring immediately (although asynchronously in general), or we can build our computation on a promise (TaskCompletionSource) and glue it to the future (Task<T>) when the program is ready for side effects. One really nice way of doing that is to define a coroutine; a.k.a., an async method in C#. An async method let’s you await a Task within the imperative style of programming, meaning that typical control flow constructs are allowed without having to resort to implementing a complex state machine over CPS (because the compiler does it for you). Upon the first await, if the Task didn’t complete synchronously, then a promise is returned to the caller of the async method and the remainder of the computation from that point is referenced in a continuation that is attached to the Task via cobind. Side effects of the downstream computation can be controlled easily by simply holding the Task rather than applying await. The program can apply await at any time in the future to continue the computation with its Result. The race condition, in this case, is irrelevant.

IEnumerable<T> is a monad with the purpose of causing repeatable side effects, attached to a blocking computation.

IObservable<T> is a monad with the purpose of causing repeatable side effects, attached to an asynchronous computation.

Even though a sequence may be hot or cold, applying the bind operator (or any operator) itself doesn’t cause any side effects, until the sequence is iterated/subscribed. Furthermore, an operator may cause side effects each time that its return value is iterated/subscribed. The monadic bind function is about repeatable side effects, regardless of the temperature of its monadic type.

In conclusion:

Comonads represent lazy computations of single values, are initially cold, and transition to hot when the computation completes.

Monads represent lazy computations over zero or more values, depending on the type. They are initially cold, and may transition from cold to hot, depending on the type.

And why is any of this important?

Because a language like C# is full of side effects. Being able to introduce side effects so easily is often what makes C# really useful, but it’s also what makes many C# programs really confusing and buggy.

Controlling side effects in a declarative manner makes programs much easier to reason about, and to prove correctness. Although, some parts of a program are just much easier to write using imperative side effects.

Therefore, the careful combination of declarative and imperative styles within the same program is often the best choice. Functions generally help with the former, as in the procedural programming style. Monads also help with the former, computing over sequences in a declarative style. Comonads (via coroutines) generally help with the latter, when single-valued asynchronous computations are required.

These techniques are becoming even more important as modern programs increasingly depend on concurrency.

References

Notes

  1. The name of SelectMany is similar to the name of the Select operator, which is the same as the SELECT operator in SQL. Likewise, the Where operator in LINQ is named the same as the WHERE operator in SQL, and so on. When LINQ was invented, apparently the designers had chosen SQL-like names for the operators to make developers more comfortable. Most developers were used to SQL, but didn't have a clue about monads. And this naming convention works of course because SQL is a monadic comprehension syntax!
  2. The C# compiler actually looks for a special signature of a method named SelectMany rather than the one shown above. It must have an intermediate selector parameter in addition to the result selector. It's just a performance optimization and so I won’t elaborate on it here.
  3. Rx actually provides several generators, including Range, Generate, Timer, Interval and Create. There are many ways to enter the continuation monad. Likewise, there are many ways to enter the list monad, IEnumerable<T>, because there are many types that implement IEnumerable<T>. And C# offers another option: Iterator Blocks.
  4. The FCL implementation of ContinueWith is defined as an instance method. Here, I’ve rewritten it as an extension method and renamed its parameters, for parity with the SelectMany method. Other than those insignificant changes, it remains identical to the definition in the FCL.
  5. What Would Erik Meijer Do?   https://twitter.com/headinthebox/status/543919077752729600
  6. https://wiki.haskell.org/Referential_transparency
  7. The TPL doesn’t invoke continuations synchronously unless you opt-in, and even then it’s not guaranteed. For the most part, the TPL is designed to introduce concurrency. This is in contrast to the design of Rx, which is free-threaded and rarely introduces concurrency. Operators introduce concurrency when you explicitly specify a concurrency-introducing scheduler, although operators that involve some kind of timing may default to a concurrency-introducing scheduler if you don’t pass in one; e.g., Throttle, Sample, Buffer, etc. This is because Rx is primarily about taming concurrency, not introducing it.

Tags: , ,

Async | C# | Patterns | Rx

March 01, 2012

Async Iterators

You’ve already heard about async in C# 5.  This new feature is going to improve UI responsiveness and developer productivity simultaneously, to say the least.  I’ll assume that you’re also familiar with Task<T>, the generic type in the .NET Framework that provides a common model for asynchronous functions, and one of the types on which the async feature depends.

Task<T> has a limitation: it only represents a scalar value T.  This is fine for async methods that only compute a single value, but what if we need to write an async method that lazily computes a sequence of T?  In other words, the entire operation is asynchronous just like Task<T>, but instead of computing a single value we want to potentially compute many values, sequentially, while also being able to await other asynchronous operations.

The best we can do with Task<T> is to return Task<IList<T>>, gathering each T into a list.  There’s no way to yield each individual T as they are computed when using Task<IList<T>>.

The following C# console application illustrates this behavior.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class TaskExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            var list = await AsyncScalar();

            Output.WriteLine("Async Completed");

            foreach (var value in list)
            {
                Output.WriteLine(value);
            }

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static async Task<IList<int>> AsyncScalar()
        {
            Output.WriteLine("AsyncScalar Called");

            var list = new List<int>();

            for (int i = 0; i < 3; i++)
            {
                Output.WriteLine("Computing " + i);

                await Task.Delay(TimeSpan.FromSeconds(1));

                list.Add(i);
            }

            return list;
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): AsyncScalar Called
(Thread 1): Computing 0
(Thread 4): Computing 1
(Thread 4): Computing 2
(Thread 4): Async Completed
(Thread 4): Value = 0
(Thread 4): Value = 1
(Thread 4): Value = 2
(Thread 4): Done

Note that the Output class is also used by subsequent code snippets in this post, but I won’t be repeating it again.  Here’s the definition:

using System;
using System.Threading;

namespace ConsoleApplication1
{
    static class Output
    {
        public static void WriteLine(string message)
        {
            Console.WriteLine("(Thread {0}): {1}", Thread.CurrentThread.ManagedThreadId, message);
        }

        public static void WriteLine(int data)
        {
            WriteLine("Value = " + data);
        }
    }
}

Notice that the AsyncScalar method in the previous example computes values asynchronously, but they’re gathered into a list that is eventually returned when the computation completes.  The caller awaiting the task gets back a pre-computed list of data (referred to as a hot sequence), which can only be iterated synchronously.

Ultimately, our goal is to somehow replace list.Add with something like yield.  We want to be able to intermix awaiting and yielding in an imperative-style method that supports control flow statements, such as for loops.  And the result must be an asynchronous sequence, which doesn’t require synchronous iteration.

async IEnumerable<T> = Error

C# doesn’t allow us to apply the async keyword to iterator blocks.  To use the async keyword on a method it must return void, Task or Task<T>.  To define an iterator block, we must return IEnumerable<T>.  Clearly these features aren’t compatible based on return type alone.

But why not support IEnumerable<T> as another possible return type for async methods?

The answer (if I may assume what the C# team was thinking) is that Task<T> represents an asynchronous function, which means that it provides the necessary model for awaiting a function’s return value.   IEnumerable<T> only represents an interactive sequence, which means that it doesn’t provide a model for awaiting an iterator block’s return value(s).  Instead, values must be pulled synchronously from the sequence.  As a result, async iterator blocks would always be forced to execute synchronously, defeating the purpose of async.

Ok, so how about Task<IEnumerable<T>>?

Nope.  It’s similar to my first code example above, which uses Task<IList<T>>.  It models an asynchronous function that eventually returns a sequence, but that sequence remains synchronous.  Recall that Task<T> represents a scalar-valued asynchronous function, which means that the C# compiler can’t do any tricks with an iterator block to convert Task<T> into something that represents an asynchronous sequence.  The best it can do is allow us to asynchronously invoke an iterator block, but the generated sequence must be enumerated synchronously, as shown in the previous example.  That’s not our goal at all.

Then how about IEnumerable<Task<T>>?

This might seem correct at first, but actually it doesn’t meet our requirements either.  It represents a synchronous sequence of asynchronous functions.  Interesting, but not what we want.  Let’s assume for a moment that the C# compiler transforms the code between yield return statements into Task<T> objects.  We could then write something like this:

// Doesn't compile
static async IEnumerable<Task<int>> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

But consider the behavior when consuming it.  We’d probably use a foreach loop, which calls GetEnumerator to get an IEnumerator<Task<int>>.  Then we’d call MoveNext, which would synchronously enter the for loop and hit the first await.  At this point, we’d expect MoveNext to return true and set Current to a Task<int>.  When the Task<int> completes it should execute the current continuation, which is the code following the await.  So then what happens if we ignore the Task<int> and call MoveNext again?  Perhaps it could block until the current Task<int> completes, but that behavior doesn’t meet our requirements.  We don’t want to block while iterating the sequence.  Furthermore, what happens if when Task<int> completes we don’t call MoveNext right away?  The iterator could begin computing the next Task<int> asynchronously, which means that it would have to buffer the sequence until our synchronous iteration catches up.  This is like the producer/consumer pattern with an implicit queue.  Again, it’s interesting but not what we want.  When a value is computed, we want our continuation to execute right away, without buffering.

So what would happen if we reversed it?  MoveNext returns Task<bool> and Current returns T.

Well that can’t be modeled with any combination of IEnumerable<T> and Task<T>, but if it existed it would allow us to await between yields, keeping the entire computation asynchronous and its iteration asynchronous as well.

IAsyncEnumerable<T>

In the Ix Experimental release (Ix_Experimental-Async on NuGet), there’s a type named IAsyncEnumerable<T>.  It has a single method named GetEnumerator that returns an IAsyncEnumerator<T>, which returns a Task<bool> from MoveNext and a T from Current.  Can we use it to write an asynchronous iterator block?  Let’s take a look at an example.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class AsyncEnumerableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            IAsyncEnumerable<int> sequence = DelayedSequence().ToAsyncEnumerable();

            Output.WriteLine("Awaiting");

            await sequence.ForEachAsync(Output.WriteLine);

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static IEnumerable<int> DelayedSequence()
        {
            Output.WriteLine("DelayedSequence Called");

            for (int i = 0; i < 3; i++)
            {
                Task.Delay(TimeSpan.FromSeconds(1)).Wait();

                Output.WriteLine("Yielding " + i);

                yield return i;
            }
        }
    }
}

 

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Awaiting
(Thread 4): DelayedSequence Called
(Thread 4): Yielding 0
(Thread 4): Value = 0
(Thread 4): Yielding 1
(Thread 4): Value = 1
(Thread 4): Yielding 2
(Thread 4): Value = 2
(Thread 4): Done

At first glance it looks like we’re closer to our goal.  At least we can iterate the sequence asynchronously and values are computed asynchronously.  The ForEachAsync extension is provided as a convenience because the foreach statement can’t handle a MoveNext method that returns Task<bool>, but you could instead call GetEnumerator and loop over a call to await MoveNext manually, if you prefer.

So we’re fine on the consumer side, but what about the producer?  Unfortunately, in order to introduce a delay we must block the current thread (note the call to Wait).  That’s because the iterator block returns IEnumerable<T>, which as we already know doesn’t model an asynchronous sequence.  It seems that IAsyncEnumerable<T> creates a Task<T> for each call to MoveNext to make our sequence consumable in an asynchronous fashion.  This means that our iterator block can’t control its own asynchrony.  Concurrency is injected for us automatically when the consumer decides that it’s ready to await the next value.  So essentially it’s still a pull model even though it’s asynchronous.

And of course we can’t simply add the async keyword to our iterator block, as discussed previously.  We’re now back where we started.

We need a better model for our asynchronous sequence.  A type that represents an asynchronous function like Task<T> as well as a sequence of values like IEnumerable<T>, but in a push-based fashion.  Then we can define out iterator block in terms of this new type by yielding values as they are computed asynchronously.

IObservable<T>

IObservable<T> and IEnumerable<T> overlap in that they both provide a lazily-computed sequence of values; however, they differ in that IObservable<T> represents an asynchronous (push) sequence while IEnumerable<T> represents a synchronous (pull) sequence.  A sequence that pushes values enables reactive computations, while a sequence from which values are pulled enables interactive computations.  LINQ provides a declarative abstraction over any computation.  Reactive Extensions for .NET (Rx) provides LINQ operators for IObservable<T> similar to the LINQ to Objects operators that require IEnumerable<T>.  If you’re not familiar with Rx yet, then you may want to get started with this blog post.

Now back to our problem at hand.  How can we write an iterator block with IObservable<T>?  Does the following work?

// Doesn't compile
static async IObservable<int> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

Unfortunately, no.  But wouldn’t it be nice?  Conceptually, it meets our requirements.  We can subscribe to the observable to receive values asynchronously.  The iterator block can define the observable by awaiting other asynchronous operations and yielding values as they are computed, sequentially yet asynchronously.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described in the remainder of this blog post.  It enables you to write code that looks very similar to the previous hypothetical example.  For more information, read this forum thread.]

Well the Rx team has already thought about this limitation in C# and provided a solution.  The Rx Experimental release (Rx_Experimental-Main on NuGet) provides several overloads of the Observable.Create method, one of which has the following signature:

public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod);

This method allows us to generate an observable using a normal iterator block.  But since we can’t mark the iterator block as async, we can’t await other asynchronous operations.  Instead, we’ll yield observables to await them.  But if we’re yielding observables, then how do we also yield computed values?  That’s where the IObserver<T> object comes in handy.  Here’s an example:

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class ObservableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start();

            Console.ReadKey();
        }

        static void Start()
        {
            Output.WriteLine("Starting");

            var xs = Observable.Create<int>(observer => AsyncSequence(observer));

            Output.WriteLine("Subscribing");

            xs.Subscribe(Output.WriteLine, () => Output.WriteLine("Done"));
        }

        static IEnumerable<IObservable<object>> AsyncSequence(IObserver<int> observer)
        {
            Output.WriteLine("AsyncSequence Called");

            for (int i = 0; i < 3; i++)
            {
                yield return Task.Delay(TimeSpan.FromSeconds(1)).ToObservable().UpCast();

                Output.WriteLine("Pushing " + i);

                observer.OnNext(i);
            }
        }
    }

    static class ObservableExtensions
    {
        public static IObservable<object> UpCast<T>(this IObservable<T> source)
        {
            return source.Select(value => (object) value);
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Subscribing
(Thread 1): AsyncSequence Called
(Thread 5): Pushing 0
(Thread 5): Value = 0
(Thread 4): Pushing 1
(Thread 4): Value = 1
(Thread 6): Pushing 2
(Thread 6): Value = 2
(Thread 6): Done

This behavior is exactly what we wanted: asynchronous subscribe, observations of values as they are generated, the ability to await other asynchronous operations and the use of normal control flow statements, such as for.

However, the implementation is a bit strange.  We’ve replaced list.Add from the first example with observer.OnNext instead of yield, which isn’t so bad, but then we also had to replace await from the first example with yield.  Not very intuitive, perhaps.

Maybe in the future we’ll get first-class compiler support for writing observable iterators.  Until then, this should do fine.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described above.  It enables you to write code that looks very similar to what first-class compiler support might look like, although it requires the use of a callback.  For more information, read this forum thread.]

Side note:

C# doesn’t support anonymous iterator blocks.  Here’s something else interesting: VB 5 does!

Do you agree that anonymous observable iterators is an apt use case for such a feature in C#?

Tags:

Async | C# 5 | Rx