I want to stop stream A for exactly one notification whenever stream B fires. Both streams will stay online and won't ever complete.
A: o--o--o--o--o--o--o--o--o
B: --o-----o--------o-------
R: o-----o-----o--o-----o--o
or
A: o--o--o--o--o--o--o--o--o
B: -oo----oo-------oo-------
R: o-----o-开发者_如何转开发----o--o-----o--o
Here's a version of my SkipWhen
operator I did for a similar question (the difference is that, in the original, multiple "B's" would skip multiple "A's"):
public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source,
IObservable<TOther> other)
{
return Observable.Create<TSource>(observer =>
{
object lockObject = new object();
bool shouldSkip = false;
var otherSubscription = new MutableDisposable();
var sourceSubscription = new MutableDisposable();
otherSubscription.Disposable = other.Subscribe(
x => { lock(lockObject) { shouldSkip = true; } });
sourceSubscription.Disposable = source.Where(_ =>
{
lock(lockObject)
{
if (shouldSkip)
{
shouldSkip = false;
return false;
}
else
{
return true;
}
}
}).Subscribe(observer);
return new CompositeDisposable(
sourceSubscription, otherSubscription);
});
}
If the current implementation becomes a bottleneck, consider changing the lock implementation to use a ReaderWriterLockSlim
.
This solution will work when the observable is hot (and without refCount
):
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
.takeUntil(streamB)
: make streamA
complete upon streamB
producing a value..skip(1)
: make streamA
skip one value upon starting (or as a result of.repeat()
)..repeat()
: make streamA
repeat (reconnect) indefinitely..merge(streamA.take(1))
: offset the effect of.skip(1)
at the beginning of the stream.
Example of making A stream skip every 5 seconds:
var streamA,
streamB;
streamA = Rx.Observable
.interval(1000)
.map(function (x) {
return 'A:' + x;
}).publish();
streamB = Rx.Observable
.interval(5000);
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
streamA.connect();
You can also use this sandbox http://jsbin.com/gijorid/4/edit?js,console to execute BACTION()
in the console log at the time of running the code to manually push a value to streamB
(which is helpful for analysing the code).
精彩评论