In this post i will show how to use the Rx (Reactive Extension) for .NET as a BackgroundWorker. This example will be a simple file enumerator in WPF. The file enumeration will run in a different thread making the application fully usable while the process runs. I will show in a step-by-step showing inclusive the commons errors when using Reactive.
The first step is to make the window. A simple window works:


The first solution is the easiest one. Just open the folder selection dialog, enumerate all the files to a list and then insert them in the ListBox.

Although this works, it freezes the window´s thread. Select a really long folder and the program stops responding to user interaction. The second problem with this solution is that all the filenames must exists at the same time in the memory. Putting a breakpoint in the enumeration line, one can easily see the memory for the process grow.
Secondary problems are that the source cannot have infinite length nor can we start to process the first item in the right moment it is ready to be processed.
Our first step will be to use the Reactive Framework (http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx ). I am using the version: 1.0.2838.0.

In the .NET 4.0, the EnumerateFiles now do not return a materialized array as in the .NET 2.0. Now it returns an IEnumerable that only enumerates as you ask it to do it. This gives us the opportunity to treat it like a lazy function.
The first step in Reactive is to create an IObservable. In this case we will use the ToObservable method.

Now we are in the Observable monad and have all the power of Reactive Extensions. Debugging now, one can see that nothing happen. The ToObservable does not enumerate the list. To enumerate the list we have to use the Subscribe method:

Now we are not storing the file name in the memory, so we can have an infinite length data source, and we can start processing the first element right after it is ready. So let remove the WriteLine and put the item in the ListBox.

Running this code and you see that… the window blocks!! Hum… well, we are still running on the window thread. The observable does not necessarily runs on another thread. But now we have the power of reactive and we can easily change the thread of where the enumeration ( the observation ) will happen.

Now the observation will happen in any free thread of the pool; I do not care. Running this code we have… an exception.

As you already know, only the thread that created the control has access to it. If we take a look in one of the overrides of the Subscribe method we see that it accepts an IObserver<T>. An IObserver is a class that treats all the signals of the IObservable. In the last example we have treated the OnNext signal passing an Action to the Subscribe method.


This observer is receiving the Dispatcher that will run the action, and the action that will ran for each item. We are not interested in the OnCompleted and OnError now.
The OnNext just call the action with a low priority message. If the message is invoked with a high priority the window will not be blocked but will not be very responsive.

To help the use of this Observer, let us create an extension method for it.

Now we are ready to glue everything together.

The unique difference is the Dispatcher as the first argument. We are using our extension method here. Running now and… only part of the files appears. In my case only the first one appeared. Hum… I suspect this problem have something to do with the using statement. Let us see the type of the "files" variable.

As suspected, the files is not the observer that we passed, in fact, the observer does not have to implement the IDisposable interface; let us removed it. If we debug again we will see that it leaves the using statement before the next OnNext signal:

That is what is happening. Between the first and the second OnNext the files is disposable. Using the reflector on the Subscribe method wee se the following code:

The flag variable is the return value of the Subscribe method. It only calls OnNext on no disosabled objects. Well, let us remove the using statement to see what happens. It must list all files now.
And it is what happens! And without blocking the window thread.
If you are estranging that we do not dispose the observation is because it is not needed. The OnCompleted signal sent by the ToObservable does it for us.

Rx will try to move to the next item. If there is no next item, flag1 will be false. Since flag1 will be false and there will be no error, Rx will call the Dispose of the observation.
But there are some interesting things we can do with this disposable object. Let us make the following change in the code.

And put a Cancel button on the XAML.


If you understood everything until now, you are thinking that this will cancel the observation in the middle. And if you run and click in the "Cancel" button you will see that you are right.
I do not know you, but I do not like the boiler code that it was needed to cancel the operation. It is separated from the code that started the operation. The following code uses the Rx to put both codes near each other.
The first step is to remove the Cancel.Click handler. Then remove the Observation definition on the class. And change to the following code:

Definitely we are starting to see the power of Rx. But the world is not easy. Put a breakpoint in the cancellation Subscribe:

Run the program. Enumerate the files and hit the Cancel two times. Yes, it steps twice in the breakpoint. Why? Remember that we have not disposed the second observation. It is still alive. In this observation we are not using the ToObservable, so there is nobody to send the OnComplete signal to us.
The problem is that we only want the first signal. In classical LINQ , if we only want the first item of a collection, but return a collection ( we do not want the First() ), we want the Take(1). Rx also has the Take method, so we can use it:

Run again with the breakpoint set and you will see that it only enters in the first click of the cancel button. After this the Click event unsubscribe happen. All of this because Take calls the OnComplete signal.
Now that everything is working as expected, let us improve the code. To be honest, we do not need the ObserverOn, we can use the ToObservable directly:

In reality we do not even need our extension method. Why? We can control in which thread the observation occurs:

The enumeration will happen in another thread and will no block the window. After this it will send the file name to the control thread and the insert will happen in the right thread. Running this and you see that your window… freezes again!!
How can we be sure that there are two thread running? The Do method allows us to have side effects in the observation. We will only write the current thread to the output.

We can see that all "OT" (Observation Thread) are different from the Control Thread. And all "ST" (Subscribe Thread) are equal to the control Thread. So everything is working fine. So, why the window is blocking?
The problem is that we are filling the windows message queue with high priority messages. The ListBox modification has to do its work in low priority. Unfortunately the Rx has no way to control the priority, so we have to make our own.
One solution is using a custom observer as we did. Another solution is to create a new IScheduler. The interface is simple. We can use the reflector to copy and modify one of the schedulers. Unfortunately none of them have virtual methods.
So the first step is to implement the IScheduler interface.

The next step is to create the scheduling methods:


With this we can create an extension method to help us:

The use no, became trivial:

But there is still an improvement we can make. We can use the method TakeUntil. TakeUntil listen to an IObservable. In the first signal of other observation it stops its observation. With this we can compose the cancellation like this.

And that is all the code needed. We do not need to subscribe to events, do not have to have fields; everything is composed right here.