开发者

how do I handle messages asynchronously, throwing away any new messages while processing?

开发者 https://www.devze.com 2022-12-21 13:27 出处:网络
I have a C# app that subscribes to a topic on our messaging system for value updates. When a new value comes in, I do some processing and then carry on. The problem is, the updates can come faster tha

I have a C# app that subscribes to a topic on our messaging system for value updates. When a new value comes in, I do some processing and then carry on. The problem is, the updates can come faster than the app can process them. What I want to do is to just hold on to the latest value, so I don't want a queue. For example, the source publishes value "1" and my app receives it; while processing, the source publishes the sequence (2, 3, 4, 5) before my app is done processing; my app then processes value "5", with th开发者_如何学Goe prior values thrown away.

It's kind of hard to post a working code sample since it's based on proprietary messaging libraries, but I would think this is a common pattern, I just can't figure out what it's called...It seems like the processing function has to run on a separate thread than the messaging callback, but I'm not sure how to organize this, e.g. how that thread is notified of a value change. Any general tips on what I need to do?


A very simple way could be something like:

private IMessage _next;

public void ReceiveMessage(IMessage message)
{
    Interlocked.Exchange(ref _next, message);
}

public void Process()
{
    IMessage next = Interlocked.Exchange(ref _next, null);

    if (next != null)
    {
        //...
    }
}


Generally speaking one uses a messaging system to prevent losing messages. My initial reaction for a solution would be a thread to receive the inbound data which tries to pass it to your processing thread, if the processing thread is already running then you drop the data and wait for the next element and repeat.


Obviously the design of the messaging library can influence the best way to handle this problem. How I've done it in the past with somewhat similar functioning libraries, is I have a thread that listens for events, and places them into a Queue, and then I have Threadpool workers that dequeue the messages and process them.

You can read up on multithreading asyncronous job queues:

Mutlithreaded Job Queue

Work Queue Threading


A simple way is to use a member variable to hold the last value received, and wrap it with a lock. Another way is to push incoming values onto a stack. When you're ready for a new value, call Stack.Pop() and then Stack.Clear():

public static class Incoming
{
    private static object locker = new object();
    private static object lastMessage = null;

    public static object GetMessage()
    {
        lock (locker)
        {
            object tempMessage = lastMessage;
            lastMessage = null;
            return tempMessage;
        }
    }
    public static void SetMessage(object messageArg)
    {
        lock (locker)
        {
            lastMessage = messageArg;
        }
    }

    private static Stack<object> messageStack = new Stack<object>();
    public static object GetMessageStack()
    {
        lock (locker)
        {
            object tempMessage = messageStack.Count > 0 ? messageStack.Pop() : null;
            messageStack.Clear();
            return tempMessage;
        }
    }
    public static void SetMessageStack(object messageArg)
    {
        lock (locker)
        {
            messageStack.Push(messageArg);
        }
    }
}

Putting the processing functions on a separate thread is a good idea. Either use a callback method from the processing thread to signal that its ready for another message, or have it signal that it's done and then have the main thread start a new processor thread when a message is received (via the above SetMessage...).


This is not a "pattern", but you could use a shared data structure to hold the value. If there is only one value received from the messaging library, then a simple object would do. Otherwise you might be able to use a hashtable to store multiple message values (if required).

For example, on the message receive thread: when a message comes in, add/update the data structure with its value. On the thread side, you could periodically check this data structure to make sure you still have the same value. If you do not, then discard any processing you have already done and re-process with the new value.

Of course, you will need to ensure the data structure is properly synchronized between threads.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号