开发者

How do I create an IObservable<T> that returns a value every -n- seconds without skipping any

开发者 https://www.devze.com 2023-01-19 03:51 出处:网络
This below example was my attempt at doing this: var source = Observable.Sample( Observable.Range(1, int.MaxValue), TimeSpan.FromSeconds(2));

This below example was my attempt at doing this:

var source
    = Observable.Sample(
          Observable.Range(1, int.MaxValue), TimeSpan.FromSeconds(2));

But when I .Subscribe() to that Observable and output it to the console, it shows a sequence like this, one line output every 2 seconds:

OnNext: 312969
OnNext: 584486
OnNext: 862009

Obviously the .Range() observable is running while the .Sample() observable is waiting 2 seconds between each output. I would like to know how to create an开发者_如何学编程 observable but that does not allow values to be skipped, so obviously that would look like this:

OnNext: 1
OnNext: 2
OnNext: 3

With one value from .Range() output every 2 seconds. How can I accomplish this in the Reactive Extensions for .NET?


Using Observable.GenerateWithTime:

var source = Observable.GenerateWithTime(1, _ => true, x => ++x, x => x, x => TimeSpan.FromSeconds(2));

Observable.Range uses Observable.Generate, so this is one approach. There could be many other ways.

For something more advanced, like dealing with events in the same manner (because this will obviously only help if you are generating the data yourself), see How to throttle event stream using RX? which deals with this problem and has been solved.


I approached this recently by creating an Observable that emits timed events every timeInterval. You can then use the Zip method to sychronize the events from your Observable with those of the timer Observable.

For instance:

    var timer = 
        Observable
            .Timer(
                TimeSpan.FromSeconds(0), 
                TimeSpan.FromSeconds(2)
            );
    var source = Observable.Range(1, int.MaxValue);
    var timedSource = source.Zip(timer,(s,t)=>s);
    timedSource.Subscribe(Console.WriteLine);


A well known example of a pacer:

public static IObservable<T> Pace<T>(this IObservable<T> source, Timespan interval) =>
source
  .Select(p =>
    Observable
     .Empty<T>()
     .Delay(interval)
     .StartWith(p)
  )
  .Concat();
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号