I have messages coming into my program with millisecond resol开发者_如何学Cution (anywhere from zero to a couple hundred messages a millisecond).
I'd like to do some analysis. Specifically, I want to maintain multiple rolling windows of the message counts, updated as messages come in. For example,
- # of messages in last second
- # of messages in last minute
- # of messages in last half-hour divided by # of messages in last hour
I can't just maintain a simple count like "1,017 messages in last second", since I won't know when a message is older than 1 second and therefore should no longer be in the count...
I thought of maintaining a queue of all the messages, searching for the youngest message that's older than one second, and inferring the count from the index. However, this seems like it would be too slow, and would eat up a lot of memory.
What can I do to keep track of these counts in my program so that I can efficiently get these values in real-time?
This is easiest handled by a cyclic buffer.
A cyclic buffer has a fixed number of elements, and a pointer to it. You can add an element to the buffer, and when you do, you increment the pointer to the next element. If you get past the fixed-length buffer you start from the beginning. It's a space and time efficient way to store "last N" items.
Now in your case you could have one cyclic buffer of 1,000 counters, each one counting the number of messages during one millisecond. Adding all the 1,000 counters gives you the total count during last second. Of course you can optimize the reporting part by incrementally updating the count, i.e. deduct form the count the number you overwrite when you insert and then add the new number.
You can then have another cyclic buffer that has 60 slots and counts the aggregate number of messages in whole seconds; once a second, you take the total count of the millisecond buffer and write the count to the buffer having resolution of seconds, etc.
Here C-like pseudocode:
int msecbuf[1000]; // initialized with zeroes
int secbuf[60]; // ditto
int msecptr = 0, secptr = 0;
int count = 0;
int msec_total_ctr = 0;
void msg_received() { count++; }
void every_msec() {
msec_total_ctr -= msecbuf[msecptr];
msecbuf[msecptr] = count;
msec_total_ctr += msecbuf[msecptr];
count = 0;
msecptr = (msecptr + 1) % 1000;
}
void every_sec() {
secbuf[secptr] = msec_total_ctr;
secptr = (secptr + 1) % 60;
}
You want exponential smoothing, otherwise known as an exponential weighted moving average. Take an EWMA of the time since the last message arrived, and then divide that time into a second. You can run several of these with different weights to cover effectively longer time intervals. Effectively, you're using an infinitely long window then, so you don't have to worry about expiring data; the reducing weights do it for you.
For the last millisecord, keep the count. When the millisecord slice goes to the next one, reset count and add count to a millisecond rolling buffer array. If you keep this cummulative, you can extract the # of messages / second with a fixed amount of memory.
When a 0,1 second slice (or some other small value next to 1 minute) is done, sum up last 0,1*1000 items from the rolling buffer array and place that in the next rolling buffer. This way you kan keep the millisecord rolling buffer small (1000 items for 1s max lookup) and the buffer for lookup the minute also (600 items).
You can do the next trick for whole minutes of 0,1 minutes intervals. All questions asked can be queried by summing (or when using cummulative , substracting two values) a few integers.
The only disadvantage is that the last sec value wil change every ms and the minute value only every 0,1 secand the hour value (and derivatives with the % in last 1/2 hour) every 0,1 minute. But at least you keep your memory usage at bay.
Your rolling display window can only update so fast, lets say you want to update it 10 times a second, so for 1 second's worth of data, you would need 10 values. Each value would contain the number of messages that showed up in that 1/10 of a second. Lets call these values bins, each bin holds 1/10 of a second's worth of data. Every 100 milliseconds, one of the bins gets discarded and a new bin is set to the number of messages that have show up in that 100 milliseconds.
You would need an array of 36K bins to hold an hour's worth information about your message rate if you wanted to preserve a precision of 1/10 of a second for the whole hour. But that seems overkill.
But I think it would be more reasonable to have the precision drop off as the time inteval gets larger.
Maybe you keep 1 second's worth of data accurate to 100 milliseconds, 1 minutes worth of data accurate to the second, 1 hour's worth of data accurate to the minute, and so on.
I thought of maintaining a queue of all the messages, searching for the youngest message that's older than one second, and inferring the count from the index. However, this seems like it would be too slow, and would eat up a lot of memory.
A better idea would be maintaining a linked list of the messages, adding new messages to the head (with a timestamp), and popping them from the tail as they expire. Or even not pop them - just keep a pointer to the oldest message that came in in the desired timeframe, and advance it towards the head when that message expires (this allows you to keep track of multiply timeframes with one list).
You could compute the count when needed by walking from the tail to the head, or just store the count separately, incrementing it whenever you add a value to the head, and decrementing it whenever you advance the tail.
精彩评论