Currently our application connects to an Arduino over a serial port. We send some ASCII-formatted commands, and get the same in return. To do this, we have a queue of commands, a thread dedicated to writing those commands to the port, and a thread dedicated to reading and handling all incoming replies. The class itself is responsible for dispatching the replies, which is giving it way too much responsibility (should just be responsible for port operations, not business logic).
We would rather do this in an async manner. Anything in the system can send a command with a callback function and a timeout. If the serial port gets a correct reply, it calls the callback function. Otherwise, it times out and maybe calls a second callback (or possibly a single callback with a succeeded?
flag).
However, we've only ever consumed async methods (particularly in web operations), not written such a system. Can anyone give us some pointers about how to proceed?
Our current plan is to store a queue of these commands. Upon any reply, if an associated command is found (by comparing ASCII values) it is dequeued and the callback is executed. A timer will periodically check for timeouts, dequeue, and execute the appropriate callback. It seems like a straightforward solution, but the amount of code to support this is increasing substantially and we wanted to ensure there weren't any better built-in solutions or best practices for this.
Edit: To clarify further, this particular class is a singleton (for better or worse), and there are many other threads running that could access it. For example, one thread may want to request a sensor value, while another thread could be controlling a motor. These commands and their associated replies do not happen in a linear fashion; the timing may be reversed. Thus, a traditional producer-consumer model is not enough; this is more of a dispatcher.
For example, let's call this singleton class Arduino
. Thread A
is running, and wants to send a command "*03"
, so it calls Arduino.Instance.SendCommand("*03")
. Meanwhile, Thread B
sends a command "*88"
, both of which get sent in near-realtime. Sometime later the Arduino
's SerialPort.Read()
thread picks up a reply for *88
and then a reply for *03
(i.e. in the opposite order they were sent). How do we allow both Thread A
and Thread B
to block correctly waiting on the specific reply to come in? We're assuming we will use AutoResetEvent开发者_Go百科
inside each thread, with an async callback to let us .Set
it.
If performance is what you're after, and async at its finest level, I suggest looking into Completion Ports. This is what is ultimately underneath, hidden in the Windows Kernel, and it's awesome. When I used them, I used C++, even found a Kernel bug because of it, but I was limited to the language.
I've seen this article on CodeProject which might be worth exploring to see where you can take your idea further and/or use the code that's there.
The nature of Completion ports is to work on callbacks. That is, in general, you "put" a request in the queue, and when something lands there, the request is read and the callback specified is read. It is in fact, a queue, but like I said, at the lowest (manageable) level (before getting almost on metal).
EDIT: I've written a sort of a FTP server/client testing utility with Completion ports, so the base process is the same - reading and writing of commands in a queuable fashion. Hope it helps.
EDIT #2: Ok, here's what I would do, based on your feedback and comments. I would have an "outgoing queue", ConcurrentQueue<Message>
. You can have a separate thread for sending messages by dequeueing each message. Note, if you want it a bit more "safe", I suggest peeking at the message, sending it, then dequeuing it. Anyway, the Message class can be internal, and look something like this:
private class Message {
public string Command { get; set; }
... additonal properties, like timeouts, etc. ...
}
In the singleton class (I'll call it CommunicationService
), I'd also have a ConcurrentBag<Action<Response>>
. This is now where the fun starts :o). When a separate concern wants to do something, it registeres itself, for example, if you have a TemepratureMeter
I would have it do something like this:
public class TemperatureMeter {
private AutoResetEvent _signal = new AutoResetEvent(false);
public TemperatureMeter {
CommunicationService.AddHandler(HandlePotentialTemperatureResponse);
}
public bool HandlePotentialTemperatureResponse(Response response) {
// if response is what I'm looking for
_signal.Set();
// store the result in a queue or something =)
}
public decimal ReadTemperature() {
CommunicationService.SendCommand(Commands.ReadTemperature);
_signal.WaitOne(Commands.ReadTemperature.TimeOut); // or smth like this
return /* dequeued value from the handle potential temperature response */;
}
}
And now, in your CommunicationService, when you receive a response, you simply to a
foreach(var action in this._callbacks) {
action(rcvResponse);
}
Voila, separation of concerns. Does it answer your question any better?
Another possible tactic would be, to couple message and callback, but having the Callback be a Func<Response, bool>
and the dispatcher thread checks if the result returned from the Func is true, then this callback is disposed.
A better choice if you are using 4.0 is to use a BlockingCollection
for that, for older versions use a combination from a Queue<T>
and AutoResetEvent
. So you will be notified when item is added and on the consumer thread and then just consume it. here we are using a push technology where at your current implementation you are using a poll technology "each time you are asking if there is any data".
Example: 4.0
//declare the buffer
private BlockingCollection<Data> _buffer = new BlockingCollection<Data>(new ConcurrentQueue<Data>());
//at the producer method "whenever you received an item":
_messageBuffer.Add(new Data());
//at the consumer thread "another thread(s) that is running without to consume the data when it arrived."
foreach (Data data in _buffer.GetConsumingEnumerable())// or "_buffer.Take" it will block here automatically waiting from new items to be added
{
//handle the data here.
}
Example: other "lower" versions:
private ConcurrentQueue<Data> _queue = new ConcurrentQueue<Data>();
private AutoResetEvent _queueNotifier = new AutoResetEvent(false);
//at the producer:
_queue.Enqueue(new Data());
_queueNotifier.Set();
//at the consumer:
while (true)//or some condition
{
_queueNotifier.WaitOne();//here we will block until receive signal notification.
Data data;
if (_queue.TryDequeue(out data))
{
//handle the data
}
}
精彩评论