开发者

Delay and de-duplication using Reactive Extensions (Rx)

开发者 https://www.devze.com 2023-02-05 19:13 出处:网络
I want to use Reactive Extensions to transform some messages and relay them after a small delay. The messages look something like this:

I want to use Reactive Extensions to transform some messages and relay them after a small delay.

The messages look something like this:

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

The output looks something like this:

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

There are a couple of requirements:

  • The length of the delay is dependent on the content of the message.
  • Each message has a GroupId
  • If a newer message comes in with the same GroupId as a delayed message awaiting transmission then the first message should be dropped and only the second one transmitted after a new delay period.

Given an Observable<InMsg> and a Send function:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

I understand that I can use Select to perform the transformation.

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • How can I apply a message specify delay? (Note this might/should result in out of order delivery of messages.)
  • How can I de-dupe messages with the same GroupId?
  • Is Rx capable 开发者_如何学Goof solving this problem?
  • Is there another way of solving this?


You can use GroupBy to make an IGroupedObservable, Delay to delay the output, and Switch to make sure newer values replace previous values in their group:

IObservable<InMsg> inMessages;

inMessages
    .GroupBy(msg => msg.GroupId)
    .Select(group =>
        {
            return group.Select(groupMsg => 
                {
                    TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
                    OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here

                    return Observable.Return(outMsg).Delay(delay);
                })
                .Switch();
        })
        .Subscribe(outMsg => Console.Write("OutMsg received"));

A note on the implementation: if a grouped value arrived after the message is sent (ie. after the delay), it will start a new delay


@Richard Szalay's answer almost works for me (using .NET Rx 3.1.1 on .NET Framework 4.6), but I have to add .Merge() to the end of the expression to combine the IObservable<IObservable<OutMsg>> results, like so:

For me (using .NET Rx 3.1.1 on .NET Framework 4.6) the fix was to add .Merge() to the end, like so:

var deduplicated = inputs
    .GroupBy(input => input)
    .Select(group =>
        group
        .Select(input => Observable.Return(input).Delay(TimeSpan.FromSeconds(5)))
        .Switch())
    .Merge(); // <-- This is added to combine the partitioned results
0

精彩评论

暂无评论...
验证码 换一张
取 消