开发者

How to implement JMS listener to ignore specific messages based on a List/Map?

开发者 https://www.devze.com 2023-01-23 19:03 出处:网络
I have messages representing actions between two users sent to a queue. Users are stored as properties on the message.

I have messages representing actions between two users sent to a queue. Users are stored as properties on the message. The requirement is not to allow action to be processed [taken from the queue] when a di开发者_如何学JAVAfferent action concerning one or the other user is being processed at the same time.

Since I'm using Grails (which means Spring) and ActiveMQ, I thought about implementing it this way:

Create a MessageListenerContainer with transactions, when listener reads the message it will get the status of transaction, check whether userA or userB has an action processed at the moment and depending on the outcome - process the action or rollback jms transaction. I thought about creating a simple map to store details about currently processed users:

{
  userId : [listenerId, timestamp]
}

I wanted to store listener id and timestamp so that there is a way to ease error handling [didn't get there yet].

So before processing action userId would be put there and after action is completed key would be removed.

Unfortunately I hit a major problem when prototyping. When I rollback the message, listener will wait for the and read the same message again. This means that if I get 5 actions for userA and have only 5 listeners the whole process will be stuck when the first action is processed. That's not what I want. I would like the listener to read a message, check if action can be processed and if not read the next message. Is it even possible to implement?

In my prototype with one listener, when I send 5 messages and roll them back, here's what I get [this is from reader: timestamp when message was read + msg number]:

1288892171570 0
1288892171578 0
1288892176582 0
1288892181586 0
1288892186589 0
1288892191594 0
1288892196596 0
1288892201601 1
1288892206604 1
1288892211607 1
1288892216612 1
1288892221614 1
1288892226618 1
1288892231621 1
1288892236625 2
1288892241629 2
1288892246632 2
1288892251636 2
1288892256641 2
1288892261645 2
1288892266647 2
1288892271652 3
1288892276656 3
1288892281659 3
1288892286663 3
1288892291667 3
1288892296671 3
1288892301674 3
1288892306679 4
1288892311682 4
1288892316686 4
1288892321689 4
1288892326693 4
1288892331696 4
1288892336700 4

Redelivery policy is the default for AMQ 5.4.1. What I would like to see is: 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4...

I thought it should work in a way, that when a transaction is rolled back and message is marked for redelivery it will be delayed but listener will continue to work. Also when message is redelivered it will be back at it's position in the queue - queue is[should be?] ordered by the message timestamp.

Example [let's say all 5 messages sent at the same time]: Msg 0 is read, check is performed, can be processed -> processing Msg 1 is read, check is performed, cannot be processed, transaction rolled back, delayed for X amount of time Msg 2 is read, check is performed, cannot be processed, transaction rolled back, delayed for X amount of time Msg 3 is read, check is performed, cannot be processed, transaction rolled back, delayed for X amount of time Msg 1 redelivered as X amount of time has elapsed, check is performed, cannot be processed, transaction rolled back, delayed for X amount of time Msg 4 is read.....

I am not sure if what I try to do is advisable or should I think about doing it some other way? I thought this way it would be simple and easily scalable. Maybe it's just a matter of AMQ configuration but I sure did look and not find anything in the redelivery settings that would help here.

Thanks, Krystian


It seems this is a "feature" of Active MQ: https://issues.apache.org/activemq/browse/AMQ-1853 Issue at the time of writing is waiting for reviews, however includes a link to a solution which could be implemented with Camel. And I think this is the way I will go [at least for now]. I will create two queues: actionsQueue busyQueue

Listener will take actions from actionsQueue and will test whether an action can be processed. If not, it will send the message to busyQueue. I will have camel listening on busyQueue and will automatically route messages to actionsQueue with a configured delay.

Seems like this is the way forward if I want to implement it using the original idea.

I am still wondering if there is a better way to do this.

0

精彩评论

暂无评论...
验证码 换一张
取 消