I'm implementing the IObservable<T>
interface on some classes. I used Reflector to figure out how this is typically done in Rx. Concerning how an observable keeps track of its subscribers and notifies them via their OnNext
method, I stumbled upon code similar to this:
private List<Observer<T>> observers;
// subscribe a new observer:
public IDisposable Subscribe(IObserver<T> observer)
{
observers.Add(observer);
...
}
// trigger all observers' OnNext method:
...
foreach (IObserver<T> observer in observers)
{
observer.OnNext(value);
}
Since all delegates are multi-cast, couldn't this be simplified to:
Action<T> observers;
// subscribe observer:
public IDisposable Sub开发者_如何学Pythonscribe(IObserver<T> observer)
{
observers += observer.OnNext;
...
}
// trigger observers' OnNext:
...
observers(value);
Or are there specific advantages to the first approach (performance, threading/concurrency issues, …)?
In general, calling the delegates individually gives you more control over the behavior:
- If one delegate raises an exception you can keep calling the others, for example, or remove the faulted delegate from your list.
- If you want to call the delegates in parallel, it's really easy.
- If you need to call them in a certain order, you can easily guarantee the correct order (I'm not sure that the order of multicast delegate calls is defined).
Usually you don't implement IObservable<T>
yourself, you return an IObservable<T>
from a method using one of the generation methods (like Observable.Create
).
However, if you are going to implement the interface yourself, you should wrap an internal Subject<T>
which will handle all the concurrency issues for you:
public class CustomObservable<T> : IObservable<T>
{
private Subject<T> subject = new Subject<T>();
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
private void EmitValue(T value)
{
subject.OnNext(value);
}
}
NB: If you decide to stick with the delegate (for whatever reason), at least make sure you are unsubscribing in your IDisposable
return value:
observers += observer.OnNext;
return Disposable.Create(() => observers -= observer.OnNext);
精彩评论