I have an interface like this:
interface IProcessor{
IObservable<Item> Process(Item item);
}
I have an array of workers:
IProcessor[] _workers = ....
I want to pass an item through all the workers:
var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
开发者_如何学C int index = i;
ret = ret
.SelectMany(r => _workers[index].Process(r))
;
}
return ret;
I'm not too happy with how this looks -- is there a cleaner way?
This works for me:
IObservable<Item> ret = _workers.Aggregate(
Observable.Return(item),
(rs, w) =>
from r in rs
from p in w.Process(r)
select p);
Please keep in mind that this kind of aggregation of observables - both in your question and in my answer - can cause memory issues (i.e. stack overflow) quickly. In my tests I could get 400 workers working, but 500 caused a crash.
You're better off changing your IProcessor
to not use observables and implement your observable like this:
interface IProcessor{
Item Process(Item item);
}
var f =
_workers.Aggregate<IProcessor, Func<Item, Item>>(
i => i,
(fs, p) => i => p.Process(fs(i)));
var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);
With this approach I can get over 20,000 nested workers before a stack overflow and the results are almost instantaneous up to that level.
Maybe something like this:?
var item = new Item();
_workers
.ToObservable()
.SelectMany(worker => worker.Process(item))
.Subscribe(item => ...);
I made an assumption that the workers can process the item in parallel.
P.S. If you'd like sequential processing, it would be
var item = new Item();
_workers
.ToObservable()
.Select(worker => worker.Process(item))
.Concat()
.Subscribe(item => ...);
精彩评论