Like the late 1800's American newspaper boy used to shout, in this blog post we'll be taking a look at something "extra" in Rx.
Nope, it's not a typo.
It’s like acceleration to speed, or C# to IL, or higher-order functions to functions.
Sometimes fast isn't fast enough. Sometimes abstract isn't abstract enough. Sometimes functions aren't... functiony enough?
Sometimes Rx operators aren’t reactive enough.
Many operators in Rx force us to supply static arguments, resulting in an operator's static behavior. By static I mean upfront, immutable, unvarying values; e.g., typically scalar primitives like Int32, TimeSpan and DateTimeOffset. Furthermore, stateful operators with static parameters prevent us from varying behavior without a potential loss of data.
Here's a simple example of a stateful operator with a scalar parameter. Let's say that we have an observable and we want to skip the first 5 notifications:
var xs = observable.Skip(5);
That's great, but what if we don't know how many notifications we want to skip yet? Or what if we want to change that value later while the observable is active?
Defer doesn't work if we want to subscribe now.
We could attempt to change the value, without causing side effects, by writing some complicated dance within the Publish operator involving subscriptions, disposing, recreating the query and resubscribing; however, we'll lose the state within the operator that tells us how many values have already been skipped. We could keep track of the number that have already been skipped ourselves, but then what's the point of the Skip operator?
Ok, easy solution, we define a new overload:
public static IObservable<T> Skip<T>(this IObservable<T> source, IObservable<int> count);
Now we can change the count at any time without resubscribing and without losing any state. We'll simply push in a new value through count whenever we want to change it.
Subscribe now, decide later.
Maybe we'll never push in a value and so everything will be skipped. Maybe we'll push in a value of 10 before the first 10 notifications have been skipped, thus only the first 10 notifications will be skipped. Maybe we change our minds and push in a value of 5, but it's too late – we've already skipped the first 7 notifications, thus only the first 7 notifications are skipped. At this point, the operator unsubscribes from count.1
But wait, are the values in count even necessary anymore? It seems overly complicated. Can't we just generalize our solution in terms of time?
public static IObservable<T> SkipUntil<T, U>(this IObservable<T> source, IObservable<U> other);
In other words, instead of an observable count, we'll simply use an observable. T is ignored. When the observable signals or terminates, the operator stops skipping.
This "reactive reactive" pattern is already in use by several operators in Rx, such as SkipUntil and TakeUntil as of Rx 1.0, and some overloads of Window, Buffer, Throttle, Sample, Delay and Timeout as of Rx 2.0, but upon further inspection it seems that Rx is potentially missing a lot of similar opportunities.
Reactive Overloads for Static Parameters
Merge – A Case Study
In one such example, we can see how data loss is a critical problem. The Merge(maxConcurrency) operator is very powerful, but its behavior is static. While inner observables are pushed in, Merge subscribes to them until maximum concurrency is reached, at which point any observables received are enqueued for later. Often these observables capture non-recoverable application state. To change the maximum level of concurrency in response to the changing capabilities of the environment requires disposing of the subscription to Merge's observable, reforming the query with a new static argument and resubscribing; however, in the process any observables enqueued by Merge are lost, and their captured application state is lost along with them.
Merge with Dynamic Maximum Concurrency
By providing a full suite of reactive/reactive overloads for Rx operators, developers would be enabled to write queries that adjust the behavior of operators dynamically based upon dynamic application state, user input and/or heuristics about the runtime environment to improve the performance of their applications, without losing data and without the costs of having to repeatedly solve similar problems themselves.
- One could also imagine the next logical scenario in which we raise the value back to 10, thus perhaps the first 7 notifications and the 9th and 10th notifications are skipped, but the 8th notification makes it through. Although these semantics are interesting, they're not the semantics of the Skip operator. Skip is all about skipping consecutive notifications from the beginning of the sequence. I'm not saying that these semantics aren't useful, it just seems more like the behavior of the Sample operator to me and surely a similar analysis applies to that operator.