开发者

ObserveOn with Scheduler.NewThread does not observe so, if observer's OnNext is blocked and continued

开发者 https://www.devze.com 2023-03-07 22:28 出处:网络
Can some one please help explain why when I \"block and continue\" observer\'s onNext sequence subscribed to a buffer with time observable sequence, that Scheduler.NewThread does not apply anymore?

Can some one please help explain why when I "block and continue" observer's onNext sequence subscribed to a buffer with time observable sequence, that Scheduler.NewThread does not apply anymore?

For example:

If I buffer a sequence of number via

var query = from number in Enumerable.Range(1,200)
            select SnoozeNumberProduction(number);

var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));

Where SnoozeNumberProduction delays the number generation by 250 ms

static int SnoozeNumberProduction(Int32 number)
{
    Thread.Sleep(250);
    return number;
}

Now later if i subscribe to the bufferedSequence with an "ObserveOn(Scheduler.NewThread)" such that I block on the fourth buffer with a Console.ReadKey

Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
    Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);

    Thread.Sleep(1000);
    count++;
    if (count == 4)
    {
        Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
        Console.ReadKey(); // Block and build up the queue
    }

    Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});

In this case if I hit any key after say 10 seconds or so, I see that the following next few buffers execute on the same ManagedThread even when Scheduler.NewThread is mentioned in the ObserveOn. Can someone please help explain this behavior?

Sample output:

(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 p开发者_开发百科roduced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**


ObserveOn is itself a layer in your composed sequence that's only job is to swap to another scheduler. However, your sleeps are happening in a Select occuring on an IEnumerable. That sequence is then being converted to an IObservable using ToObservable, which defaults to Dispatcher.CurrentThread.

It's only at this point that you are swapping to another thread for each item that comes in. If you change it to:

var query = from number in Enumerable.Range(1,200).ToObservable(Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));

Now the enumeration occurs on a new thread and, as you're not doing anything to change that, it will stay there.

There's actually an Observable.Range that starts as an IObservable and takes an optional IDispatcher. However, I assumed your source is not actually Enumerable.Range. In case it is, here's the equivalent:

var query = from number in Observable.Range(1,200, Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));


I cross posted this question to MSDN Rx forum http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 and learnt that this is for efficiency reasons


You're blocking the call to OnNext in Subscribe. The ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread. The ObserveOn operator reuses the current thread to call OnNext, sequentially, for as many values as are currently available. Since you're blocking in Subscribe, multiple calls to OnNext will accumulate. After you unblock, the queued calls are executed on the same thread. I believe this is to avoid the overhead of creating a new thread per notification when it's unnecessary.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号