I've seen tons of examples on how to use the Observable.FromAsyncPattern() in the Rx Framework to simplify Async calls, but I'm using an interface that doesn't use the standard Async pattern of IAsyncResult BeginXXX/EndXXX(IAsyncResult), so this doesn't work for me.
The library I'm working with exposes async functions with a callback patter:
void GetAllObjects(Action<List开发者_高级运维<Object>> callback)
In an ideal world I'd like to turn this:
var isLoadingUsers = true;
var isLoadingSystems = true;
var isLoadingCustomers = true;
var isLoadingRules = true;
mClient.GetAllUsers(UsersCallback);
mClient.GetAllCustomers(CustomersCallback);
mClient.GetAllRules(RulesCallback);
// set the IsLoadingXXX variables to false in callbacks
// once all are false
mClient.GetAllSystems(SystemsCallback);
into something like this:
var o = Observable.ForkJoin(
Observable.Start(GetAllUsers()),
Observable.Start(GetAllCustomers()),
Observable.Start(GetAllRules())
).Finally(() => GetAllSystems);
How would one go about turning that pattern into something that returns an IObservable?
Func<IObservable<TRet>> FromListCallbackPattern(Action<Action<List<TRet>>> function)
{
return () => {
// We use a ReplaySubject so that if people subscribe *after* the
// real method finishes, they'll still get all the items
ret = new ReplaySubject<TRet>();
function((list) => {
// We're going to "rebroadcast" the list onto the Subject
// This isn't the most Rx'iest way to do this, but it is the most
// comprehensible :)
foreach(var v in list) {
ret.OnNext(v);
}
ret.OnCompleted();
});
return ret;
};
}
Now, you can do something like:
var getAllUsers = FromListCallbackPattern(mClient.GetAllUsers);
getAllUsers().Subscribe(x => /* ... */);
Try Observable.Create()
, perhaps something like this:
public IObservable<Object> ObserveAllObjects()
{
return Observable.Create<Object>(
observer =>
() => GetAllObjects(objects => objects.ForEach(o => observer.OnNext(o))));
}
I like Observable.Create for this, but @dahlbyk answer is incorrect(misses completion and performs the action in the unsubscribe handler). Should be something like this:
IObservable<List<T>> FromListCallbackPattern<T>(
Action<Action<List<T>>> listGetter)
{
return Observable
.Create<List<T>>(observer =>
{
var subscribed = true;
listGetter(list =>
{
if (!subscribed) return;
observer.OnNext(list);
observer.OnCompleted();
});
return () =>
{
subscribed = false;
};
});
}
Also, since the originating API returns an entire list altogether, I don't see a reason to transform it to observable too early. Let the resulting observable return a list as well, and if a caller needs to flatten it, he can use .SelectMany
精彩评论