开发者

How to re-subscribe to sequence in particular point?

开发者 https://www.devze.com 2023-04-12 04:11 出处:网络
I\'m trying to solve the following: a) subscriber receives events from IObservable for some time. Then it unsubscribes, do some stuff and then subscri开发者_开发技巧be again. Here it should start rece

I'm trying to solve the following: a) subscriber receives events from IObservable for some time. Then it unsubscribes, do some stuff and then subscri开发者_开发技巧be again. Here it should start receiving events from exactly the same point where unsubscription was performed. b) Such behavior is desirable for multiple subscribers model. E.g. when one has unsubscribed, others should continue receiving events.

Are there any suggestions from the RX side?

Thanks in advance!


Here's a reasonably simple Rx way to do what you want copied from my answer to this other question. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);


It sounds like you need a "pausable" stream. Assuming that only 1 subscriber will handle the values at a time (while the other subscribers just wait), this solution is probably what you need.

0

精彩评论

暂无评论...
验证码 换一张
取 消