I have a collection of observables that generate state changes for a so-called Channel
. And I have a ChannelSet
that should monitor those channels.
I would like to write something like this: if one channel is operational, the channel set is up, else, the channel set is down.
IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
channelSet.ChannelSetState = ChannelSetState.Up;
else
channelSet.ChannelSetState = ChannelSetState.Down;
But where do I get my IEnumerable<ChannelState>
? If I have 1 channel, I can simply subscribe to its state changes and modify the state of the channel set accordingly. For two channels, I could use CombineLatest
:
Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
{
if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
return ChannelSetState.Up;
else
return ChannelSetState.Down;
});
But I have an IEnumerable<Channel>
and a corresponding IEnumerable<IObservable<ChannelState>>
. I'm looking for something like CombineLatest
that is not 开发者_JAVA技巧limited to a fixed number of observables.
To complicate matters, the collection of channels can be added to and removed from. So once in a while, a channel will be added for example. The new channel also generates state changes that need to be incorporated.
So what I'm actually looking for is a function:
IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
that keeps up-to-date when the input changes. There should be some way to accomplish this using Rx but I can't really figure out how.
There's a fairly straight forward way to do what you want with Rx, but you need to think in terms of observables only and not mix in enumerables.
The function signature that you really need to think in terms of is:
IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
Here's the function:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
It is important that each IObservable<ChannelState>
in the IObservable<IObservable<ChannelState>>
behaves properly for this to work.
I've assumed that the ChannelState
enum has an Idle
state and that each IObservable<ChannelState>
will produce zero or more pairs of Operational
/Idle
values (Operational
followed by Idle
) before completing.
Also you said "the collection of channels can be added to and removed from" - thinking in terms of IEnumerable<IObservable<ChannelState>>
this sounds reasonable - but in Rx you don't have to worry about removes because each observable can signal its own completion. Once it signals completion then it is as if it has been removed from the collection because it can not produce any further values. So you only need to worry about adding to the collection - this is easy using subjects.
So now the function can be used like so:
var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);
channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc
I ran this using some test code, that used three random ChannelState
observables, with a Do
call in the f
function for debugging, and got the following sequence:
1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down
I think that's what you're after. Let me know if I've missed anything.
As per the comments below, the ChannelState
enum has multiple states, but only Operational
means that the connection is up. So it's very easy to add a DistinctUntilChanged
operator to hide multiple "down" states. Here's the code now:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
Added code to ensure that the first select query always starts with a 1
. Here's the code now:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.StartWith(1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
Perhaps start with an IObservable<Channel>
rather than starting w/ IEnumerable<Channel>
. A way to do this would be to use a Subject<Channel>
, and when a new one is created, OnNext()
it.
If you need a list,
xsChannels.Subscribe(item => { lock(list) { list.add(item); } });
I promised to add the solution I came up with myself, so here it is. As long as I haven't found anything better I'll use this, although I still think there has to be a better way :)
I use a class that uses a ConcurrentDictionary
to keep the latest value from each registered observable. When an observable is unregistered, its latest value is removed again, as well as the subscription associated with it.
When any registered observable generates a value, all latest values are collected and sent to a Subject
.
public class DynamicCombineLatest<T>
{
private readonly IDictionary<IObservable<T>, T> _latestValues =
new ConcurrentDictionary<IObservable<T>, T>();
private readonly IDictionary<IObservable<T>, IDisposable> _subscriptions =
new ConcurrentDictionary<IObservable<T>, IDisposable>();
private readonly ISubject<IEnumerable<T>> _result =
new Subject<IEnumerable<T>>();
public void AddObservable(IObservable<T> observable)
{
var subscription =
observable.Subscribe(t =>
{
_latestValues[observable] = t;
_result.OnNext(_latestValues.Values);
});
_subscriptions[observable] = subscription;
}
public void RemoveObservable(IObservable<T> observable)
{
_subscriptions[observable].Dispose();
_latestValues.Remove(observable);
_subscriptions.Remove(observable);
}
public IObservable<IEnumerable<T>> Result
{
get { return _result; }
}
}
精彩评论