In the following code if I understand joins in RX correctly, I should see the following alerts occur:
- West
- Test
- Test-West*
- Done
I get 3 of the 4 alerts I expect... why aren't I receiving "Test-West" as well?
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
var loginInitial = new LoginInitial();
var loginCheckList = new LoginCheckList();
var result1 = from x in loginInitial.Status
from y in loginCheckList.Status
where x == "Test" && y == "West"
select new { x, y };
result1.Subscribe(x => MessageBox.Show(x.x + "-" + x.y));
var result2 = from x in loginInitial.Status
where x == "Test"
select x;
result2.Subscribe(x => MessageBox.Show(x));
var result3 = from x in loginCheckList.Status
where x == "West"
select x;
result3.Subscribe(x => MessageBox.Show(x));
var task1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10000000; i++)
{
if (i == 9000000)
loginInitial.Status.Publish("9000000");
if (i == 9000001)
loginInitial.Status.Publish("Test");
}
});
var task2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 1000000; i++)
{
if (i == 800000)
loginInitial.Status.Publish("800000");
if (i == 800001)
loginCheckList.Status.Publish("West");
}
});
Task.WaitAll(task1, task2);
MessageBox.Show("Done");
}
}
public class LoginInitial
{
public PublishObservable<string> Status = new PublishObservable<string>();
}
public class LoginCheckList
{
public PublishObservable<string> Status = new Publi开发者_如何学PythonshObservable<string>();
}
public class PublishObservable<T> : IObservable<T>
{
private IList<IObserver<T>> _observers = new List<IObserver<T>>();
public void Publish(T value)
{
lock (_observers)
{
foreach (var observer in _observers)
{
observer.OnNext(value);
}
}
}
public void Complete()
{
lock (_observers)
{
foreach (var observer in _observers)
{
observer.OnCompleted();
}
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
lock (_observers)
{
_observers.Add(observer);
}
return null;
}
}
When you use the from
clause in Rx, you're saying that the rest of the clause should run for all occurrences of the observable. For nested from
clauses this means that you're waiting for the first occurrence of the first event and then start running the rest of the clause for this occurrence (and then do the same thing in parallel for all future occurrences). You can find more information on how SelectMany
works for example here.
When you look at your example:
var result1 =
from x in loginInitial.Status
from y in loginCheckList.Status
where x == "Test" && y == "West"
select new { x, y };
...this means that the clause needs to wait for loginInitial.Status
. When this triggers a value, it starts waiting for loginCheckList.Status
. If I understand your code correctly, the Initial
observable will produce a value after the CheckList
observable, so by the time you start waiting for the second one, the value has already been generated and you will not get it again.
I think that a more appropriate operation in your case would be Observable.Zip
or CombineLatest
(see this and this).
Tomas Petricek pretty much explains why this is happening. I'll just add a solution as an example.
As well as adjusting result1 to use CombineLatest
(which also needs to use extension method syntax as opposed to linq syntax), I've changed the implementation to use Subject
which will remove the need to create your own implementation of IObservable
. I've also changed your implementations that uses multiple subscriptions into a single subscription by mergin geach result observable through Observable.Merge
.
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
var loginInitial = new Subject<String>();
var loginCheckList = new Subject<String>();
var result1 = loginInitial.CombineLatest(loginCheckList,
(x, y) => new Tuple<string, string>(x, y))
.Where(latest => latest.Item1 == "Test" && latest.Item2 == "West")
.Select(latest => string.Format("{0} - {1}", latest.Item1, latest.Item2));
var result2 = from x in loginInitial
where x == "Test"
select x;
var result3 = from x in loginCheckList
where x == "West"
select x;
Observable.Merge(result1, result2, result3)
.Subscribe(Console.WriteLine);
var task1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10000000; i++)
{
if (i == 9000000)
loginInitial.OnNext("9000000");
if (i == 9000001)
loginInitial.OnNext("Test");
}
});
var task2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 1000000; i++)
{
if (i == 800000)
loginInitial.OnNext("800000");
if (i == 800001)
loginCheckList.OnNext("West");
}
});
Task.WaitAll(task1, task2);
Console.WriteLine("Done");
}
}
Note 1 - I've used CombineLatest
here but you could just as easily change it to use Zip
depending on the behavior you need. Check out the marble diagrams on the RxAs pages for Zip and CombineLatest for a better idea of how each behaves.
Note 2 - I would probably change result2 and result3 to use extension method syntax so that there isn't a mix of approaches in one method. Nothing wrong with it the way it is but I'd prefer the consistency of using one type of syntax where possible.
精彩评论