Sunday, June 27, 2010

Rx Part 6 – Scheduling and Threading

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far in the series of posts we have managed to avoid any explicit usage of threading or concurrency. There are some methods that we have covered that implicitly will be introducing some level of concurrency to perform their jobs (e.g. : Buffer, Delay, Sample etc all require a separate thread to do their magic). However most of this has been kindly abstracted away from us. This post will look at the beauty of the Rx API and its ability to effectively remove the need for WaitHandles, and any explicit calls to using Threads, the ThreadPool and the new shiny Task type.
A friend of mine once wisely stated that you should always understand at least one layer below what you are coding. At the time he was referring to networking protocols, but I think it is sage advice for all programming. On the current project I am working on there are some very savvy developers that are very comfortable working in a multithreaded environment. The project has client and server side threading problems that we have had to tackle. I believe the whole team would agree that it has bee amazing that amount of concurrency that Rx will handle for you in a declarative way. The code base is virtually free of WaitHandles, Monitor or lock usage, or any explicit creation of threads. This has evolved into this state over time as we have come to grips with the power of Rx and the end result is far cleaner code. However, having the experience on the team allowed us to find out ways we should and shouldn’t be using Rx which would have been just too hard for me to do alone.
Getting back to my friend’s comment about understanding the underlying subsystem, this is especially important when dealing with Rx and scheduling. Just because Rx abstracts some of this away, it does not mean that you cant still create problems for yourself if you are not careful. Before I scare you too much let’s look at some of the Scheduling features of Rx.

Scheduling

In the Rx world, you can control the scheduling of two things
  1. The invocation of the subscription
  2. The publishing of notifications
As you could probably guess these are exposed via two extension methods to IObservable<T> called SubscribeOn and ObserveOn. Both methods have an overload that take an IScheduler and will return an IObservable<T> so you can chain methods together.
public static class Observable
{
  public static IObservable<TSource> ObserveOn<TSource>(
      this IObservable<TSource> source, IScheduler scheduler) 
  {...}
public static IObservable<TSource> SubscribeOn<TSource>(
      this IObservable<TSource> source, IScheduler scheduler) 
  {...}
}

public interface IScheduler
{
    IDisposable Schedule(Action action);
    IDisposable Schedule(Action action, TimeSpan dueTime);
    DateTimeOffset Now { get; }
}
The IScheduler interface is of less interest to me than the types that implement the interface. Depending on your platform* (Silverlight3, Silverlight4, .Net 3.5, .Net 4.0) you will be exposed appropriate implementations via a static class Scheduler. These are the static properties that you can find on the Scheduler type that expose different schedulers.
Scheduler.Dispatcher will ensure that the actions are performed on the Dispatcher, which is obviously useful for Silverlight and WPF applications. You can imagine that the implementation for this would just delegate any calls to ISchedule(Action) straight to Dispatcher.BeginInvoke(Action)
Scheduler.NewThread will schedule all actions onto a new thread.
Scheduler.ThreadPool will schedule all actions onto the Thread Pool.
Scheduler.TaskPool (which is only available to Silverlight 4 and .NET 4.0) will schedule actions onto the TaskPool.
Scheduler.Immediate will ensure the action is not scheduled but is executed immediately.
Scheduler.CurrentThread just ensures that the actions are performed on the thread that made the original call. This is different to Immediate, as CurrentThread will queue the action to be performed. Note the difference in the output of the following code. One method passes Scheduler.Immediate, the other passes Scheduler.CurrentThread.
private static void ScheduleTasks(IScheduler scheduler)
{
    Action leafAction = () => Console.WriteLine("leafAction.");
    Action innerAction = () =>
                             {
                                 Console.WriteLine("innerAction start.");
                                 scheduler.Schedule(leafAction);
                                 Console.WriteLine("innerAction end.");
                             };
    Action outerAction = () =>
                             {
                                 Console.WriteLine("outer start.");
                                 scheduler.Schedule(innerAction);
                                 Console.WriteLine("outer end.");
                             };
    scheduler.Schedule(outerAction);
}

public void CurrentThreadExample()
{
    ScheduleTasks(Scheduler.CurrentThread);
    Console.ReadLine();
    /*Output:
     * outer start.
     * outer end.
     * innerAction start.
     * innerAction end.
     * leafAction.
     */
}

public void ImmediateExample()
{
    ScheduleTasks(Scheduler.Immediate);
    Console.ReadLine();
    /*Output:
     * outer start.
     * innerAction start.
     * leafAction.
     * innerAction end.
     * outer end.
     */
}
*Sorry Rx for JavaScript, I have not even opened the box on you and don’t know anything about scheduling in JavaScript.
Examples
So they are each of our Schedulers, lets see some of them in use. The think I want to point out here is that the first few times I used these overloads I had them confused as to what they actually did. You should use the SubscribeOn method to describe how you want any warm up and background processing code to be scheduled. ObserveOn method is used to describe where you want your notification scheduled to. So for example, if you had a WPF application that used Rx to populate and ObservableCollection<T> then you would almost certainly want to use SubscribeOn with one of the Threaded schedulers (NewThread, ThreadPool or maybe TaskPool) and then you would have to use the Dispatcher scheduler to update your collection.
public void LoadCustomers()
{
    _customerService.GetCustomers()
        .SubscribeOn(Scheduler.NewThread)
        .ObserveOn(Scheduler.Dispatcher)
        .Subscribe(Customers.Add);
}
So all of the schedulers just offer a nice abstraction to us to utilise the various ways we can write concurrent code. Besides saving me from having to write the tedious code to get code onto a new thread or thread pool it also makes Rx threading easy. Oh Rx, you thought I had forgotten. I didn’t think that any of the schedulers except Current & Immediate warranted a further explanation but, I do think it is worth pointing out some of the “fun” threading problems you can face even though the scheduling has been abstracted away from you.

Deadlocks

When writing the current application my team is working on we found out the hard way that Rx code can most certainly deadlock. When you consider that some calls (like .First() ) are blocking, and that we can schedule work to be done in the future, it becomes obvious that race condition can apply. This example is the most simple deadlock I could think of. It is fairly silly but it will get the ball rolling.
var stream = new Subject<int>();
Console.WriteLine("Next line should deadlock the system.");
var value = stream.First();
stream.OnNext(1);
Console.WriteLine("I can never execute....");
Hopefully we wont ever write code that silly, and if we did our tests would give us fairly quick feed back that things were wrong. What lets deadlocks slip into the system is when they manifest themselves at integration points. This example may be a little harder to find but is only small step away from the silly 1st example. Here we block in the constructor on a UI element which will always be created on the dispatcher. The blocking call is waiting for an event, that can only be raised from the dispatcher – deadlock.
public Window1()
{
    InitializeComponent();
    DataContext = this;
    Value = "Default value";

    //Deadlock! We need the dispatcher to continue
    // to allow me to click the button to produce a value.
    Value = _subject.First(); 

    //This will give same result but will not be blocking(deadlocking).
    _subject.Take(1).Subscribe(value => Value = value);
}

private void MyButton_Click(object sender, RoutedEventArgs e)
{
    _subject.OnNext("New Value");
}

public string Value
{
    get { return _value; }
    set
    {
        _value = value;
        var handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs("Value"));
    }
}
In this example we start seeing things that can become more sinister. This example has a Button that the click command will try to get the first value from an Observable exposed via an interface.
public partial class Window1 : INotifyPropertyChanged
{
    private readonly IMyService _service = new MyService(); //Imagine DI here.
    private int _value2;

    public Window1()
    {
        InitializeComponent();
        DataContext = this;
    }

    public int Value2
    {
        get { return _value2; }
        set
        {
            _value2 = value;
            var handler = PropertyChanged;
            if (handler != null) handler(this, new PropertyChangedEventArgs("Value2"));
        }
    }

    #region INotifyPropertyChanged Members

    public event PropertyChangedEventHandler PropertyChanged;

    #endregion

    private void MyButton2_Click(object sender, RoutedEventArgs e)
    {
        Value2 = _service.GetTemperature().First();
    }
}
There is only a small problem here in that we block on the Dispatcher thread (.First() is a blocking call), however this manifest's itself into a deadlock if the service code is written incorrectly.
class MyService : IMyService
{
    public IObservable<int> GetTemperature()
    {
        return Observable.Create<int>(
            o =>
                {
                    o.OnNext(27);
                    o.OnNext(26);
                    o.OnNext(24);
                    return () => { };
                })
            .SubscribeOnDispatcher();
    }
}
This odd implementation with explicit scheduling will cause the 3 OnNext calls to be scheduled once the .First() call has finished, which is waiting for an OnNext to be called – Deadlock.
So far this post has been a bit doom and gloom about scheduling and the problems you could face, that is not the intent. I just wanted to make it obvious that Rx was not going to solve the age old concurrency problems, but it will make it easier to get it right if you follow this simple rule.
  1. Only the final subscriber should be setting the scheduling.
  2. Avoid using .First() –Ed: that is for you Olivier. We will cal this rule 1b
Where the last example came unstuck is that the service was dictating the scheduling paradigm when really it had no business doing so. Before we had a clear idea of where we should be doing the scheduling in my current project, we had allsorts of layers adding “helpful” scheduling code. What it ended up creating was a threading nightmare. When we removed all scheduling code and then located it in a single layer (at least in the Silverlight client) most of our concurrency problems went away. I recommend you do the same. At least in WPF/Silverlight applications, the pattern should be simple: “Subscribe on a Background thread; Observe on the Dispatcher”.
So my challenge to the readers is to add to the comments:
  1. Any other scheduling rules (2 seems quite small, and I was only going to have 1)
  2. Post some nasty Rx race condition code
  3. What rules do you have for Subscribing on the background thread? Which Scheduler should I use and when i.e. NewThread, ThreadPool & TaskPool. – and I come full circle about understanding one layer below that to which you are working.
Further reading/watching:
  1. This channel9 video has more interesting stuff including testing with schedulers http://channel9.msdn.com/shows/Going+Deep/Wes-Dyer-and-Jeffrey-Van-Gogh-Inside-Rx-Virtual-Time/
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 5 - Combining multiple IObservable streams
Forward to next post; Part 7 - Hot and Cold observables
Technorati Tags: ,,

Saturday, June 19, 2010

RX Part 5 – Combining multiple IObservable<T> streams

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the last post we covered some of the flow control features of Rx and how to conceptualise them with Marble diagrams. This post will continue to build on those concepts by looking at different ways of working with multiple streams.
The Concat extension method is probably the most simple extension method. If you have covered the previous flow control post then most of the error handling constructs are more complex than this method. The method will simple publish values from the second stream once the first stream completes.
//Generate values 0,1,2
var stream1 = Observable.Generate(0, i => i < 3, i => i, i => i + 1);
//Generate values 100,101,102,103,104
var stream2 = Observable.Generate(100, i => i < 105, i => i, i => i + 1);

stream1
    .Concat(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * 
 * stream1 --0--0--0--|
 * stream2 -----------0--0--0--0--|
 * 
 * result  --0--0--0--0--0--0--0--|
 */
If either stream was to OnError then the result stream would OnError too. This means that if stream1 produced an OnError then stream2 would never be used. If you wanted stream2 to be used regardless of if stream1 produced an OnError or not then the extension method OnErrorResumeNext would be your best option.
Quick Video on Concat, Catch and OnErrorResume next on Channel9.
The Amb method was a new concept to me. I believe this comes from functional programming and is an abbreviation of Ambiguous. Effectively this extension method will produce values from the stream that first produces values and will completely ignore the other stream. In the examples below I have 2 streams that both produce values. In the first example stream1 will win the race and the result stream will be stream1’s values. In the second example, I delay the stream1 from producing values so stream2 will win the race and the result stream will be the values from stream2.
//Generate values 0,1,2
var stream1 = Observable.Range(0,3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Range(100,5);

stream1
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 *  if stream 1 produces a value first...
 * stream1 --0--0--0--|
 * stream2 ---0--0--0--0--0--|
 * 
 * result  --0--0--0--|     //All from stream1
 */

stream1.Delay(TimeSpan.FromMilliseconds(100))
    .Amb(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ---0--0--0--|
 * stream2 --0--0--0--0--0--|
 * 
 * result  --0--0--0--0--0--|     //All from stream2
 */
The Merge extension method does a primitive combination of multiple streams where they implement the same type of T. The result will also be an IObservable<T> but will have the values produced to the result stream as the occur in the source streams. The stream will complete when all of the source streams complete or when an OnError is published by any stream.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values 100,101,102,103,104
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => i + 100);
stream1
    .Merge(stream2)
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----0----0|
 * stream2 --0--0--0--0--0|
 * 
 * result  --0-00--00-0--00-|  
 * Output:
 * 100
 * 0
 * 101
 * 102
 * 1
 * 103
 * 104      //Note this is a race condition. 2 could be
 * 2        //  published before 104.
 */
Merge also provides other overloads that allow you to pass more than 2 source observables via an IEnumerable or params arrays. The Overload that take a params array it great for when we know how many streams we want to merge at compile time, and the IEnumerable overload is better for when we dont know at compile time how many streams we need to merge.
//Create a third stream
var stream3 = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10).Select(i => i + 200);

//Number of streams known at compile time.
Observable.Merge(stream1, stream2, stream3)
    .Subscribe(Console.WriteLine);
Console.ReadLine();

//We can dynamically create a list at run time with this overload.
var streams = new List<IObservable<long>>();
streams.Add(stream1);
streams.Add(stream2);
streams.Add(stream3);
Observable.Merge(streams).Subscribe(Console.WriteLine);
Console.ReadLine();
A quick video on Merge on Channe9.
SelectMany, like it’s counter part in IEnumerable<T> extension method will create the Caretisan product of the two streams. So for every item in one stream, it will give you every item in the other stream. A primitive way to think of it is a nexted for loop that creates a 2D array. If you want more info on SelectMany I will leave it to you to do a google search as this fairly well documented in the IEnumerable world.
//Generate values 0,1,2
var stream1 = Enumerable.Range(0, 3).ToObservable();
//Generate values 100,101,102,103,104
var stream2 = Enumerable.Range(100, 5).ToObservable();
stream1
    .SelectMany(i => stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Output.
 * { Left = 0, Right = 100 }
 * { Left = 0, Right = 101 }
 * { Left = 1, Right = 100 }
 * { Left = 0, Right = 102 }
 * { Left = 1, Right = 101 }
 * { Left = 2, Right = 100 }
 * { Left = 0, Right = 103 }
 * { Left = 1, Right = 102 }
 * { Left = 2, Right = 101 }
 * { Left = 0, Right = 104 }
 * { Left = 1, Right = 103 }
 * { Left = 2, Right = 102 }
 * { Left = 1, Right = 104 }
 * { Left = 2, Right = 103 }
 * { Left = 2, Right = 104 }
 */
A quick Video on SelectMany on channel9
Zip is another interesting merge feature. Just like a Zipper on clothing or a bag, the Zip method will bring together two sets of values as pairs; two-by-two. Things to note about the Zip function is that the result stream will complete when the first of the streams complete, it will error if either of the streams error and it will only publish once it was a pair. So if one of the source streams publish values faster than the other stream, the rate of publishing will be dictated by the slower of the two streams.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .Zip(stream2, (lhs, rhs) => new { Left = lhs, Right = rhs })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/* Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|   s2 values represented as chars
 * 
 * result  ----0----1----2|
 *             a    b    c
 */
Here are two short videos on Zip (first, second)on Channel9. Note the second video is actually incorrect, can you spot why?
CombineLatest is worth comparing to the zip method. Both methods will use a function that takes a value from each stream to produce the result value. The difference is that CombineLatest will cache the last value of each stream, and when either stream produces a new value then that new value and that last value from the other stream will be sent to the result function. This example uses the same inputs as the previous Zip example but note that many more values are produced. The leaves CombineLatest somewhere between Zip and SelectMany :-)
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e,f
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(6).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .CombineLatest(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine, () => Console.WriteLine("Complete"));
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e--f|     stream 2 values represented as chars
 * 
 * result  ----00--01-1--22-2|     the result as pairs.
 *             ab  cc d  de f    
 */
Quick video on CombineLatest on Channel9
ForkJoin like the last few extension methods also requires a function to produce the result but this will only return the last values from each stream. Things to note with ForkJoin is that like the previous methods, if either stream error so will the result stream, but if either stream is empty (ie completes with no values) then the result stream will also be empty. This example uses the same values as the previous samples and will only produce a pair from the last values from each stream once they both complete.
//Generate values 0,1,2
var stream1 = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(3);
//Generate values a,b,c,d,e
var stream2 = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(5).Select(i => Char.ConvertFromUtf32((int)i + 97));
stream1
    .ForkJoin(stream2, (s1, s2) => new { Left = s1, Right = s2 })
    .Subscribe(Console.WriteLine);
Console.ReadLine();
/*
 * Returns:
 * stream1 ----0----1----2|        stream 1 values represented as ints
 * stream2 --a--b--c--d--e|   s2 values represented as chars
 * 
 * result  --------------2|   the result as pairs. 
 *                       e     
 */
One thing to note with most of the extension methods discussed is that they generally have a matching static method that takes a params array or IEnumerable as discussed in the Merge chapter.
So having looked at these ways to bring multiple observables together we have implicitly brought some concurrency and threading to our code. This allows us to nicely lead into the next post in the series which will be on Scheduling and Threading with Rx.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 4 - Flow control
Forward to next post; Part 6 - Scheduling and threading
Technorati Tags: ,,