开发者

How to supress the very next event of stream A whenever stream B fires

开发者 https://www.devze.com 2023-03-03 11:01 出处:网络
I want to stop stream A for exactly one notification whenever stream B fires. Both streams will stay online and won\'t ever complete.

I want to stop stream A for exactly one notification whenever stream B fires. Both streams will stay online and won't ever complete.

A: o--o--o--o--o--o--o--o--o  
B: --o-----o--------o-------  
R: o-----o-----o--o-----o--o  

or

A: o--o--o--o--o--o--o--o--o  
B: -oo----oo-------oo-------  
R: o-----o-开发者_如何转开发----o--o-----o--o  


Here's a version of my SkipWhen operator I did for a similar question (the difference is that, in the original, multiple "B's" would skip multiple "A's"):

public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Create<TSource>(observer =>
    {
        object lockObject = new object();
        bool shouldSkip = false;

        var otherSubscription = new MutableDisposable();
        var sourceSubscription = new MutableDisposable();

        otherSubscription.Disposable = other.Subscribe(
            x => { lock(lockObject) { shouldSkip = true; } });

        sourceSubscription.Disposable = source.Where(_ =>
        {
            lock(lockObject)
            {
                if (shouldSkip)
                {
                    shouldSkip = false;
                    return false;
                }
                else
                {
                    return true;
                }
            }
        }).Subscribe(observer);

        return new CompositeDisposable(
            sourceSubscription, otherSubscription);
    });
}

If the current implementation becomes a bottleneck, consider changing the lock implementation to use a ReaderWriterLockSlim.


This solution will work when the observable is hot (and without refCount):

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
  1. .takeUntil(streamB): make stream A complete upon stream B producing a value.
  2. .skip(1): make stream A skip one value upon starting (or as a result of .repeat()).
  3. .repeat(): make stream A repeat (reconnect) indefinitely.
  4. .merge(streamA.take(1)): offset the effect of .skip(1) at the beginning of the stream.

Example of making A stream skip every 5 seconds:

var streamA,
    streamB;

streamA = Rx.Observable
    .interval(1000)
    .map(function (x) {
        return 'A:' + x;
}).publish();

streamB = Rx.Observable
    .interval(5000);

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);

streamA.connect();

You can also use this sandbox http://jsbin.com/gijorid/4/edit?js,console to execute BACTION() in the console log at the time of running the code to manually push a value to streamB (which is helpful for analysing the code).

0

精彩评论

暂无评论...
验证码 换一张
取 消