I used 101 samples of Rx Framework ( http://rxwiki.wikidot.com/101samples#toc47 ) last example and created a class like below and usage like in the test function.
private void Test()
{
var order = new Order();
order.ObservableOrder.Subscribe(
ord => Console.WriteLine("Order progress "), // subscribe to onnext event
ex => Console.WriteLine("Paid error " + ex.Message), // subscribe to error event
() => Console.WriteLine("Paid oncompleted ") // subscribe to completed event
); // Subscribe
order.Start();
}
public class Order
{
private readonly Subject<Order> _subject = new Subject<Order>();
public IObservable<Order> ObservableOrder { get { return _subject.AsObservable(); } }
public void RaiseError() {
_subject.OnError(new Exception("test exception"));
}
public void RaiseCompleted() {
_subject.OnCompleted();
}
public void RaiseProgress() {
_subject.OnNext(this);
}
public void Start() 开发者_StackOverflow社区{
for (int i = 0; i < 5; i++)
this.RaiseProgress();
this.RaiseError(); // either error is raised
this.RaiseCompleted(); // or completed is raised
}
}
My requirement is how to write a base class ( or some kind of factory ) to make it easy to use this for 20-30 classes which does similar things ( like they need to send progress, oncompleted and on error events ) Also how can I also send some data back to on progress and oncompleted events ( like we send exception into onerror event )
OnCompleted()
accepts no arguments, which obviously can't be changed.
You have a few choices:
You can create a payload class that has both the process and the final payload in it. class OrderProgress { double Process; OrderResult Result; }
You can expose another subject, Progress
, which you can subscribe to in addition to ObservableOrder
.
If you want to keep Start
reentrant, you could have it return an object that exposed both a Progress
and OrderProgress
observable.
Edit: Including example code (of my second option):
private void Test()
{
var order = new Order();
order.ObservableProgress.Subscribe(
prog => Console.WriteLine("Order progress " + prog.ToString()
);
order.ObservableOrder.Subscribe(
ord => Console.WriteLine("Order progress"),
ex => Console.WriteLine("Paid error " + ex.Message),
() => Console.WriteLine("Paid oncompleted")
);
order.Start();
}
public class Order
{
private readonly Subject<Order> _subject = new Subject<Order>();
private readonly Subject<double> _progressSubject = new Subject<double>();
public IObservable<Order> ObservableOrder
{
get { return _subject.AsObservable(); }
}
public IObservable<double> ObservableProgress
{
get { return _progressSubject.AsObservable(); }
}
public void RaiseError() {
_subject.OnError(new Exception("test exception"));
}
public void RaiseCompleted() {
_subject.OnNext(this);
_subject.OnCompleted();
_progressSubject.OnCompleted();
}
public void RaiseProgress(double progress) {
_progressSubject.OnNext(progress);
}
public void Start() {
for (int i = 0; i < 5; i++)
this.RaiseProgress((double)i / 5D);
this.RaiseError(); // either error is raised
this.RaiseCompleted(); // or completed is raised
}
}
精彩评论