Can some one please help explain why when I "block and continue" observer's onNext sequence subscribed to a buffer with time observable sequence, that Scheduler.NewThread does not apply anymore?
For example:
If I buffer a sequence of number via
var query = from number in Enumerable.Range(1,200)
select SnoozeNumberProduction(number);
var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));
Where SnoozeNumberProduction delays the number generation by 250 ms
static int SnoozeNumberProduction(Int32 number)
{
Thread.Sleep(250);
return number;
}
Now later if i subscribe to the bufferedSequence with an "ObserveOn(Scheduler.NewThread)" such that I block on the fourth buffer with a Console.ReadKey
Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
count++;
if (count == 4)
{
Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
Console.ReadKey(); // Block and build up the queue
}
Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});
In this case if I hit any key after say 10 seconds or so, I see that the following next few buffers execute on the same ManagedThread even when Scheduler.NewThread is mentioned in the ObserveOn. Can someone please help explain this behavior?
Sample output:
(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 p开发者_开发百科roduced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**
ObserveOn
is itself a layer in your composed sequence that's only job is to swap to another scheduler. However, your sleeps are happening in a Select
occuring on an IEnumerable
. That sequence is then being converted to an IObservable
using ToObservable
, which defaults to Dispatcher.CurrentThread
.
It's only at this point that you are swapping to another thread for each item that comes in. If you change it to:
var query = from number in Enumerable.Range(1,200).ToObservable(Dispatcher.NewThread)
select SnoozeNumberProduction(number);
var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
Now the enumeration occurs on a new thread and, as you're not doing anything to change that, it will stay there.
There's actually an Observable.Range
that starts as an IObservable
and takes an optional IDispatcher
. However, I assumed your source is not actually Enumerable.Range
. In case it is, here's the equivalent:
var query = from number in Observable.Range(1,200, Dispatcher.NewThread)
select SnoozeNumberProduction(number);
var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
I cross posted this question to MSDN Rx forum http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 and learnt that this is for efficiency reasons
You're blocking the call to OnNext in Subscribe. The ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread. The ObserveOn operator reuses the current thread to call OnNext, sequentially, for as many values as are currently available. Since you're blocking in Subscribe, multiple calls to OnNext will accumulate. After you unblock, the queued calls are executed on the same thread. I believe this is to avoid the overhead of creating a new thread per notification when it's unnecessary.
精彩评论