开发者

Multiple SelectMany in Rx

开发者 https://www.devze.com 2023-04-08 08:45 出处:网络
I have an interface like this: interface IProcessor{ IObservable<Item> Process(Item item); } I have an array of workers:

I have an interface like this:

interface IProcessor{
    IObservable<Item> Process(Item item);
}

I have an array of workers:

IProcessor[] _workers = ....

I want to pass an item through all the workers:

var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
开发者_如何学C    int index = i;
    ret = ret
        .SelectMany(r => _workers[index].Process(r))
    ;
}
return ret;

I'm not too happy with how this looks -- is there a cleaner way?


This works for me:

IObservable<Item> ret = _workers.Aggregate(
    Observable.Return(item),
    (rs, w) =>
        from r in rs
        from p in w.Process(r)
        select p);

Please keep in mind that this kind of aggregation of observables - both in your question and in my answer - can cause memory issues (i.e. stack overflow) quickly. In my tests I could get 400 workers working, but 500 caused a crash.

You're better off changing your IProcessor to not use observables and implement your observable like this:

interface IProcessor{
    Item Process(Item item);
}

var f =
    _workers.Aggregate<IProcessor, Func<Item, Item>>(
            i => i,
            (fs, p) => i => p.Process(fs(i)));

var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);

With this approach I can get over 20,000 nested workers before a stack overflow and the results are almost instantaneous up to that level.


Maybe something like this:?

var item = new Item();
_workers
  .ToObservable()
  .SelectMany(worker => worker.Process(item))
  .Subscribe(item => ...);

I made an assumption that the workers can process the item in parallel.

P.S. If you'd like sequential processing, it would be

var item = new Item();
_workers
  .ToObservable()
  .Select(worker => worker.Process(item))
  .Concat()
  .Subscribe(item => ...);
0

精彩评论

暂无评论...
验证码 换一张
取 消