开发者

Springboot实现Activemq死信队列详解

开发者 https://www.devze.com 2023-12-14 10:26 出处:网络 作者: demon7552003
目录死信队列是什么什么情况下消息会重投递Spring整合Activemq服务端配置可以单独设置重投递策略触发重投递死信队列是什么
目录
  • 死信队列是什么
  • 什么情况下消息会重投递
  • Spring整合
    • Activemq服务端配置
    • 可以单独设置重投递策略
    • 触发重投递

死信队列是什么

当消息不能重投递或者消息过期,会被移到死信队列中,由管理员消费。

可以进行以下操作:

  • delete:删除记录
  • retry:重新投递
  • copy: 复制到一个选择的队列中。
  • move:移动到一个选择的队列中。

Springboot实现Activemq死信队列详解

什么情况下消息会重投递

消息重投递的常用场景:

  • 事务回滚
  • 事务提交前close
  • A client is using CLIENT_ACKNOWLEDGE on a Session and calls recover() on that Session.

重新投递策略

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

死信队列

重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

Spring整合

Activemq服务端配置

重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

配置个性化死信队列。

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">">
                <deadLetterStrategy>
                  <individualDeadLetterStrategy queuePrefix="DLQ."
                  useQueueForQueueMessages="true"
                  processExpired="false"
                  processNonPersistent="false"/>
                </deadLetterStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinawww.devze.comtionPolicy>

可以单独设置重投递策略

    @Bean
    public ActiveMQConnectionFactoryCustomizer myCustomizer2() {
        return编程 new ActiveMQConnectionFactoryCustomizer() {

            /**
             * Customize the {@link ActiveMQConnectionFactory}.
             *
             * @param factory the factory to customize
             */
            @Override
            public void customize(ActiveMQConnectionFactory factory) {

                //设置了队列的 policy。
              php  ActiveMQQueue q = new ActiveMQQueue("queue.dlq.test");
                RedeliveryPolicy policy = new RedeliveryPolicy();
                policy.setjavascriptMaximumRedeliveries(3);
                factory.getRedeliveryPolicyMap().put(q,policy);

                System.out.println("Customizer 2");
            }
        };
    }

触发重投递

代码回滚

 @JmsListener(destination = "queue.dlq.test" ,id = "test")
    public void consume(String param, Session session) {

        try {
            System.out.println(session.getAcknowledgeMode());
            System.out.println(param);
            //回滚,则重投递
            session.rollback();
        }catch (Exception ex){

        }
    }

抛出异常自动回滚

由于默认是开启事务的,因此抛出异常,会自动触发回滚。

@JmsListener(destination = "queue.dlq.test2",id="test2")
    public void consume2(String param, Session session) {
        System.out.println(param);
        throw new RuntimeException("xxxx");
    }
//AbstractMessageListenerContainer
protected void doExecuteListener(Session session, Message message) throws JMSException {
		if (!isAcceptMessagesWhileStopping() &&pythonamp; !isRunning()) {
			if (logger.isWarnEnabled()) {
				logger.warn("Rejecting received message because of the listener container " +
						"having been stopped in the meantime: " + message);
			}
			rollbackIfNecessary(session);
			throw new MessageRejectedwhileStoppingException();
		}

		try {
			invokeListener(session, message);
		}
     //JMSException,RuntimeException,Error  这3类异常会回滚。
		catch (JMSException | RuntimeException | Error ex) {
      //自动回滚
			rollbackOnExceptionIfNecessary(session, ex);
			throw ex;
		}
     //自动提交
		commitIfNecessary(session, message);
	}

其他的Exception 都会被包装成ListenerExecutionFailedException,它是JMSException的子类,所以所有异常都会导致回滚。

到此这篇关于Springboot实现Activemq死信队列详解的文章就介绍到这了,更多相关Activemq死信队列内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号