I want to use Reactive Extensions to transform some messages and relay them after a small delay.
The messages look something like this:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
The output looks something like this:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
There are a couple of requirements:
- The length of the delay is dependent on the content of the message.
- Each message has a GroupId
- If a newer message comes in with the same GroupId as a delayed message awaiting transmission then the first message should be dropped and only the second one transmitted after a new delay period.
Given an Observable<InMsg> and a Send function:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
I understand that I can use Select to perform the transformation.
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- How can I apply a message specify delay? (Note this might/should result in out of order delivery of messages.)
- How can I de-dupe messages with the same GroupId?
- Is Rx capable 开发者_如何学Goof solving this problem?
- Is there another way of solving this?
You can use GroupBy
to make an IGroupedObservable
, Delay
to delay the output, and Switch
to make sure newer values replace previous values in their group:
IObservable<InMsg> inMessages;
inMessages
.GroupBy(msg => msg.GroupId)
.Select(group =>
{
return group.Select(groupMsg =>
{
TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here
return Observable.Return(outMsg).Delay(delay);
})
.Switch();
})
.Subscribe(outMsg => Console.Write("OutMsg received"));
A note on the implementation: if a grouped value arrived after the message is sent (ie. after the delay), it will start a new delay
@Richard Szalay's answer almost works for me (using .NET Rx 3.1.1 on .NET Framework 4.6), but I have to add .Merge()
to the end of the expression to combine the IObservable<IObservable<OutMsg>>
results, like so:
For me (using .NET Rx 3.1.1 on .NET Framework 4.6) the fix was to add .Merge()
to the end, like so:
var deduplicated = inputs
.GroupBy(input => input)
.Select(group =>
group
.Select(input => Observable.Return(input).Delay(TimeSpan.FromSeconds(5)))
.Switch())
.Merge(); // <-- This is added to combine the partitioned results
精彩评论