开发者

How do you chain together two Asynchronous Operations with the Reactive Framework?

开发者 https://www.devze.com 2022-12-21 04:53 出处:网络
All I really want to do is complete two async operations, one right after the other.E.g. Download website X. Once that\'s completed, 开发者_Go百科download website Y.Do a SelectMany from both observab

All I really want to do is complete two async operations, one right after the other. E.g.

Download website X. Once that's completed, 开发者_Go百科download website Y.


Do a SelectMany from both observables (use ToAsync to get the operations as IObservables), just like a LINQ select many (or using syntax sugar):

var result = 
from x in X
from y in Y
select new { x, y }  

There are other options but it depends on your specific scenario.


Maybe something along the following lines:

    static IObservable<DownloadProgressChangedEventArgs> CreateDownloadFileObservable(string url, string fileName)
    {
       IObservable<DownloadProgressChangedEventArgs> observable =
           Observable.CreateWithDisposable<DownloadProgressChangedEventArgs>(o =>
        {
            var cancellationTokenSource = new CancellationTokenSource();
            Scheduler.TaskPool.Schedule
            (
                () =>
                {

                    Thread.Sleep(3000);
                    if (!cancellationTokenSource.Token.IsCancellationRequested)
                    {
                        WebClient client = new WebClient();
                        client.DownloadFileAsync(new Uri(url), fileName,fileName);


                        DownloadProgressChangedEventHandler prgChangedHandler = null;
                        prgChangedHandler = (s,e) =>
                            {                                                   
                                o.OnNext(e);
                            };


                        AsyncCompletedEventHandler handler = null;
                        handler = (s, e) =>
                        {
                            prgChangedHandler -= prgChangedHandler;                            
                            if (e.Error != null)
                            {
                                o.OnError(e.Error);
                            }
                            client.DownloadFileCompleted -= handler;
                            o.OnCompleted();
                        };
                        client.DownloadFileCompleted += handler;
                        client.DownloadProgressChanged += prgChangedHandler;
                    }
                    else
                    {
                        Console.WriteLine("Cancelling download of {0}",fileName);
                    }
                }
            );
            return cancellationTokenSource;
        }
        );
       return observable;
    }

    static void Main(string[] args)
    {

        var obs1 = CreateDownloadFileObservable("http://www.cnn.com", "cnn.htm");
        var obs2 = CreateDownloadFileObservable("http://www.bing.com", "bing.htm");

        var result = obs1.Concat(obs2);
        var subscription = result.Subscribe(a => Console.WriteLine("{0} -- {1}% complete ",a.UserState,a.ProgressPercentage), e=> Console.WriteLine(e.Message),()=> Console.WriteLine("Completed"));
        Console.ReadKey();
        subscription.Dispose();
        Console.WriteLine("Press a key to exit");
        Console.ReadKey();
}

You could turn the CreateDownloadFileObservable into an extension method on WebClient, and remove the WebClient client = new WebClient(); in the method definiton, the client woudl then look like so:

WebClient c = new WebClient();
c.CreateDownloadFileObservable ("www.bing.com","bing.htm");


I don't like suggestions like this one myself, but...

If you need to perform two operations sequentially, do them sequentially (you can still do them on a thread different from main).

If you still want to separate the tasks in code, then the new construct from System.Parallel would be appropriate:

var task1 = Task.Factory.StartNew (() => FirstTask());
var task2 = task1.ContinueWith (frst => SecondTask ());

And if this is a way to grok system.reactive, try Observable.GenerateInSequence - but it surely will be overkill. Remember, observable is a counterpart to enumerable, and it is better used in a similar way (iterating over data). It is not a tool to run async operations.

Edit: I want to admit that I was wrong and Richard was right - at the time of my reply I wasn't fully comfortable with RX. Now I think that RX is the most natural way to start asynchronous operations. Yet, Richard's answer is a bit skimpy, should be:

var result =  
from x in XDownload.ToAsync()() 
from y in YDownload.ToAsync()() 
select y   
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号