开发者

How to implement an atomic switch from one IObserver to another?

开发者 https://www.devze.com 2023-03-12 14:25 出处:网络
I have an IObservable<byte[]> that I transform into an IObservable<XDocument> using some intermediate steps:

I have an IObservable<byte[]> that I transform into an IObservable<XDocument> using some intermediate steps:

var observedXDocuments =
    from b in observedBytes
    // Lot of intermediate steps to transform byte arrays into XDocuments
    select xDoc;

At some point in time, I'm interested in the observed XDocuments so I subscribe an IObserver<XDocument>. At a later point in time, I would like to subscribe another IObserver<XDocument> and dispose of the old one.

How can I do this in one atomic operati开发者_Python百科on, without loosing any observed XDocument? I could do something like:

oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);

I'm worried though, that between these two calls, I could loose an XDocument. If I switch the two calls, it could happen that I receive the same XDocument twice.


I'd probably add a layer of indirection. Write a class called ExchangeableObserver, subscribe it to your observable, and keep it permanently subscribed. The job of ExchangeableObserver is to delegate everything to a given sub-observer. But the programmer is allowed to change the sub-observer being delegated to at any time. In my example I have an Exchange() method. Something like:

public class ExchangeableObserver<T> : IObserver<T> {
  private IObserver<T> inner;

  public ExchangeableObserver(IObserver<T> inner) {
    this.inner=inner;
  }

  public IObserver<T> Exchange(IObserver<T> newInner) {
    return Interlocked.Exchange(ref inner, newInner);
  }

  public void OnNext(T value) {
    inner.OnNext(value);
  }

  public void OnCompleted() {
    inner.OnCompleted();
  }

  public void OnError(Exception error) {
    inner.OnError(error);
  }
}


you can use a semaphore that makes shure that while IObservable<byte[]> prepares for IObservable<XDocument> no observer-change takes place.

pseudocode how this could be done (not testet)

  System.Threading.ReaderWriterLockSlim criticalSection 
       = new System.Threading.ReaderWriterLockSlim(...);  


  ... converting from `IObservable<byte[]>` to `IObservable<XDocument>`  
  criticalSection.EnterReadLock();
  Call IObservable<XDocument>
  criticalSection.ExitReadLock();

  .... replacing IObservable<XDocument>
  criticalSection.EnterWriteLock();
  Call change IObservable<XDocument>
  criticalSection.ExitWriteLock();

Edit: with Call IObservable<XDocument>

  > What exactly do you mean with the line `Call IObservable<XDocument>`?

I interprete your sentense

  > I have an `IObservable<byte[]>` that I transform 
  > into an `IObservable<XDocument>` using some intermediate steps...

that you have registered an eventhandler for IObservable<byte[]> that creates a XDocument from byte[] and then calls something that triggers an event for IObservable<XDocument>.

Call IObservable<XDocument> means the code that triggers the followup-event

0

精彩评论

暂无评论...
验证码 换一张
取 消