开发者

Using WCF service via async interface from worker thread, how do I ensure that events are sent from the client "in order"

开发者 https://www.devze.com 2023-01-24 00:59 出处:网络
I am writing a Silverlight class library to abstract the interface to a WCF service.The WCF service provides a centralized logging service.The Silverlight class library provides a simplified log4net-l

I am writing a Silverlight class library to abstract the interface to a WCF service. The WCF service provides a centralized logging service. The Silverlight class library provides a simplified log4net-like interface (logger.Info, logger.Warn, etc) for logging. From the class library I plan to provide options such that logged messages can be accumulated on the client and sent in "bursts" to the WCF logging service, rather than sending each message as it occurs. Generally, this is working well. The class library does accumulate messages and it does send collections of messages to the WCF logging service, where they are logged by an underlying logging framework.

My current problem is that the messages (from a single client with a single thread - all logging code is in button click events) are becoming interleaved in the logging service. I realize that the at least part of this is probably due to the instancing (PerCall) or Synchronization of the WCF logging service. Howeve开发者_运维百科r, it also seems that my messages are occurring in such rapid succession that that the "bursts" of messages leaving on the async calls are actually "leaving" the client in a different order than they were generated.

I have tried to set up a producer consumer queue as described here with a slight (or should that be "slight" with air quotes) change that the Work method blocks (WaitOne) until the async call returns (i.e. until the async callback executes). The idea is that when one burst of messages is sent to the WCF logging service, the queue should wait until that burst has been processed before sending the next burst.

Maybe what I am trying to do is not feasible, or maybe I am trying to solve the wrong problem, (or maybe I just don't know what I am doing!).

Anyway, here is my producer/consumer queue code:

  internal class ProducerConsumerQueue : IDisposable
  {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal ProducerConsumerQueue(LoggingService.ILoggingService loggingService)
    {
      this.loggingService = loggingService;
      worker = new Thread(Work);
      worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
      //Queue the next burst of messages
      lock(locker)
      {
        logEventQueue.Enqueue(logEvents);
        //Is this Set conflicting with the WaitOne on the async call in Work?
        wh.Set();        
      }
    }

    private void Work()
    {
      while(true)
      {
        ObservableCollection<LoggingService.LogEvent> events = null;

        lock(locker)
        {
          if (logEventQueue.Count > 0)
          {
            events = logEventQueue.Dequeue();
            if (events == null || events.Count == 0) return;            
          }
        }

        if (events != null && events.Count > 0)
        {
          System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

          //
          // This seems to be the key...
          // Send one burst of messages via an async call and wait until the async call completes.
          //
          loggingService.BeginLogEvents(events, ar =>
          {
            try
            {
              loggingService.EndLogEvents(ar);
              System.Diagnostics.Debug.WriteLine("3. Work - Back");
              wh.Set();
            }
            catch (Exception ex)
            {
            }
          }, null);

          System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

          wh.WaitOne();

          System.Diagnostics.Debug.WriteLine("4. Work - Finished");
        }
        else
        {
          wh.WaitOne();
        }
      }
    }

    #region IDisposable Members

    public void Dispose()
    {
      EnqueueLogEvents(null);
      worker.Join();
      wh.Close();
    }

    #endregion
  }

In my test it is essentially called like this:

//Inside of LogManager, get the LoggingService and set up the queue.
ILoggingService loggingService = GetTheLoggingService();
ProducerConsumerQueue loggingQueue = new ProducerConsumerQueue(loggingService);

//Inside of client code, get a logger and log with it
ILog logger = LogManager.GetLogger("test");

for (int i = 0; i < 100; i++)
{
  logger.InfoFormat("logging message [{0}]", i);
}

Internally, logger/LogManager accumulates some number of logging messages (say 25) before adding that group of messages to the queue. Something like this:

internal void AddNewMessage(string message)
{
  lock(logMessages)
  {
    logMessages.Add(message);
    if (logMessages.Count >= 25)
    {
      ObservableCollection<LogMessage> messages = new ObservableCollection<LogMessage>(logMessages);
      logMessages.Clear();
      loggingQueue.EnqueueLogEvents(messages);
    }
  }
}

So, in this case I would expect to have 4 bursts of 25 messages each. Based on the Debug statements in my ProducerConsumerQueue code (maybe not the best way to debug this?), I would expect to see something like this:

  1. Work - Sending 25 events
  2. Work - Waiting
  3. Work - Back
  4. Work - Finished

Repeated 4 times.

Instead I am seeing something like this:

*1. Work - Sending 25 events

*2. Work - Waiting

*4. Work - Finished

*1. Work - Sending 25 events

*2. Work - Waiting

*3. Work - Back

*4. Work - Finished

*1. Work - Sending 25 events

*2. Work - Waiting

*3. Work - Back

*4. Work - Finished

*1. Work - Sending 25 events

*2. Work - Waiting

*3. Work - Back

*3. Work - Back

*4. Work - Finished

(Added leading * so that the lines would not be autonumbered by SO)

I guess I would have expected that, the queue would have allowed multiple bursts of messages to be added, but that it would completely process one burst (waiting on the acync call to complete) before processing the next burst. It doesn't seem to be doing this. It does not seem to be reliably waiting on the completion of the async call. I do have a call to Set in the EnqueueLogEvents, maybe that is cancelling the WaitOne from the Work method?

So, I have a few questions: 1. Does my explanation of what I am trying to accomplish make sense (is my explanation clear, not is it a good idea or not)?

  1. Is what I am trying to (transmit - from the client - the messages from a single thread, in the order that they occurred, completely processing one set of messages at a time) a good idea?

  2. Am I close?

  3. Can it be done?

  4. Should it be done?

Thanks for any help!

[EDIT] After more investigation and thanks to Brian's suggestion, we were able to get this working. I have copied the modified code. The key is that we are now using the "wh" wait handle strictly for ProducerConsumerQueue functions. Rather than using wh to wait for the async call to complete, we are now waiting on res.AsyncWaitHandle, which is returned by the BeginLogEvents call.

  internal class LoggingQueue : IDisposable
  {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    bool working = false;

    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal LoggingQueue(LoggingService.ILoggingService loggingService)
    {
      this.loggingService = loggingService;
      worker = new Thread(Work);
      worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
      lock (locker)
      {
        logEventQueue.Enqueue(logEvents);

        //System.Diagnostics.Debug.WriteLine("EnqueueLogEvents calling Set");

        wh.Set();
      }
    }

    private void Work()
    {
      while (true)
      {
        ObservableCollection<LoggingService.LogEvent> events = null;

        lock (locker)
        {
          if (logEventQueue.Count > 0)
          {
            events = logEventQueue.Dequeue();
            if (events == null || events.Count == 0) return;
          }
        }

        if (events != null && events.Count > 0)
        {
          //System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

          IAsyncResult res = loggingService.BeginLogEvents(events, ar =>
          {
            try
            {
              loggingService.EndLogEvents(ar);
              //System.Diagnostics.Debug.WriteLine("3. Work - Back");
            }
            catch (Exception ex)
            {
            }
          }, null);

          //System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

          // Block until async call returns.  We are doing this so that we can be sure that all logging messages
          // are sent FROM the client in the order they were generated.  ALSO, we don't want interleave blocks of logging
          // messages from the same client by sending a new block of messages before the previous block has been
          // completely processed.

          res.AsyncWaitHandle.WaitOne();

          //System.Diagnostics.Debug.WriteLine("4. Work - Finished");
        }
        else
        {
          wh.WaitOne();
        }
      }
    }

    #region IDisposable Members

    public void Dispose()
    {
      EnqueueLogEvents(null);
      worker.Join();
      wh.Close();
    }

    #endregion
  }

As I mentioned in my initial question and in my comments to Jon and Brian, I still don't know if doing all of this work is a good idea, but at least the code does what I wanted it to do. That means that I at least have the choice of doing it this way or some other way (such as restoring order after the fact) rather than not having the choice.


Can I suggest that there's a simple alternative to all this coordination? Have a sequence using a cheap monotonically increasing ID (e.g. with Interlocked.Increment()) so that no matter what order things happen at the client or server, you can regenerate the original ordering later on.

That should let you be efficient and flexible, sending whatever you want asynchronously without waiting for acknowledgement, but without losing the ordering.

Obviously that means the ID (or possibly a guaranteed-unique timestamp field) would need to be part of your WCF service, but if you control both ends that should be reasonably simple.


The reason you are getting that kind of sequencing is because you are trying to use the same wait handle that the producer-consumer queue is using for a different purpose. That is going to cause all kinds of chaos. At some point things will go from bad to worse and the queue will get live-locked eventually. You really should create a separate WaitHandle to wait for completion of the logging service. Or if the BeginLoggingEvents fits the standard pattern it will return a IAsyncResult that contains a WaitHandle that you can use instead of creating your own.

As a side note, I really do not like the producer-consumer pattern presented on the Albarahi website. The problem is that it is not safe for multiple consumers (obviously that is of no concern to you). And I say that with all due respect because I think his website is one of the best resources for multithreaded programming. If BlockingCollection is available to you then use that instead.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号