开发者

RocketMQMessageListener注解对rocketmq消息的消费实现机制

开发者 https://www.devze.com 2023-11-19 15:07 出处:网络 作者: codecraft
目录序示例RocketMQListenerRocketMQMessageListenerRocketMQMessageListenerBeanPostProcessorregisterContainerDefaultRocketMQListenerContainerDefaultMessageListenerOrderlyDefaultMessageListenerConcurrentl
目录
  • 示例
  • RocketMQListener
  • RocketMQMessageListener
  • RocketMQMessageListenerBeanPostProcessor
  • registerContainer
  • DefaultRocketMQListenerContainer
    • DefaultMessageListenerOrderly
    • DefaultMessageListenerConcurrently
    • handleMessage
  • start
    • 小结

      本文主要研究一下RocketMQMessageListener的实现机制

      示例

      @Service
      @RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
      public class UserConsumer implements RocketMQListener<User> {
          @Override
          public void onMessage(User message) {
              System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getUserAge(), message.getUserName());
          }
      }
      实现了RocketMQListener接口的类,再配合@RocketMQMessageListener注解就可以实现对rocketmq消息的消费

      RocketMQListener

      rocketmq-spring-boot/src/main/Java/org/apache/rocketmq/spring/core/RocketMQListener.java

      public interface RocketMQListener<T> {
          void onMessage(T message);
      }
      RocketMQListener接口定义了onMessage方法

      RocketMQMessageListener

      rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

      @Target(ElementType.TYPE)
      @Retention(RetentionPolicy.RUNTIME)
      @Documented
      public @interface RocketMQMessageListener {
          String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
          String Access_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
          String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
          String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
          String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
          /**
           * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
           * load balance. It's required and needs to be globally unique.
           *
           *
           * See <a href="http://rocketmq.apache.org/docs/core-concept/" rel="external nofollow" >here</a> for further discussion.
           */
          String consumerGroup();
          /**
           * Topic name.
           */
          String topic();
          /**
           * Control how to selector message.
           *
           * @see SelectorType
           */
          SelectorType selectorType() default SelectorType.TAG;
          /**
           * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
           */
          String selectorExpression() default "*";
          /**
           * Control consume mode, you can choice receive message concurrently or orderly.
           */
          ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
          /**
           * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
           */
          MessageModel messageModel() default MessageModel.CLUSTERING;
          /**
           * Max consumer thread number.
           * This property control consumer thread pool executor maximumPoolSize see
           * {@link ConsumeMessageService#updateCorePoolSize(int)}
           * @see <a href="https://github.com/apache/rocketmq-spring/issues/546" rel="external nofollow" >issues#546</a>
           */
          int consumeThreadMax() default 64;
          /**
           * consumer thread number.
           */
          int consumeThreadNumber() default 20;
          /**
           * Max re-consume times.
           *
           * In concurrently mode, -1 means 16;
           * In orderly mode, -1 means Integer.MAX_VALUE.
           */
          int maxReconsumeTimes() default -1;
          /**
           * Maximum amount of time in minutes a message may block the consuming thread.
           */
          long consumeTimeout() default 15L;
          /**
           * Timeout for sending reply messages.
           */
          int replyTimeout() default 3000;
          /**
           * The property of "access-key".
           */
          String accessKey() default ACCESS_KEY_PLACEHOLDER;
          /**
           * The property of "secret-key".
           */
          String secretKey() default SECRET_KEY_PLACEHOLDER;
          /**
           * Switch flag instance for message trace.
           */
          boolean enableMsgTrace() default false;
          /**
           * The name value of message trace topic.If you don't config,you can use the default trace topic name.
           */
          String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
          /**
           * The property of "name-server".
           */
          String nameServer() default NAME_SERVER_PLACEHOLDER;
          /**
           * The property of "access-channel".
           */
          String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
          /**
           * The property of "tlsEnable" default false.
           */
          String tlsEnable() default "false";
          /**
           * The namespace of consumer.
           */
          String namespace() default "";
          /**
           * Message consume retry strategy in concurrently mode.
           *
           * -1,no retry,put into DLQ directly
           * 0,broker control retry frequency
           * >0,client control retry frequency
           */
          int delayLevelWhenNextConsume() default 0;
          /**
           * The interval of suspending the pull in orderly mode, in milliseconds.
           *
           * The minimum value is 10 and the maximum is 30000.
           */
          int suspendCurrentQueueTimeMillis() default 1000;
          /**
           * Maximum time to await message consuming when shutdown consumer, in milliseconds.
           * The minimum value is 0
           */
          int awaitTerminationMillisWhenShutdown() default 1000;
          /**
           * The property of "instanceName".
           */
          String instanceN编程客栈ame() default "DEFAULT";
      }
      RocketMQMessageListener注解定义了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeThreadNumber、maxReconsumeTimes、consumeTimeout、replyTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel、tlsEnable、namespace、delayLevelWhenNextConsume、suspendCurrentQueueTimeMillis、awaitTerminationMillisWhenShutdown、instanceName属性

      RocketMQMessageListenerBeanPostProcessor

      rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java

      public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
          private ApplicationContext applicationContext;
          private AnnotationEnhancer enhancer;
          private ListenerContainerConfiguration listenerContainerConfiguration;
          @Override
          public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
              return bean;
          }
          @Override
          public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
              Class<?> targetClass = AopUtils.getTargetClass(bean);
              RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
              if (ann != null) {
                  RocketMQMessageListener enhance = enhance(targetClass, ann);
                  if (listenerContainerConfiguration != null) {
                      listenerContainerConfiguration.registerContainer(beanName, bean, enhance);
                  }
              }
              return bean;
          }
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              this.applicationContext = applicationContext;
          }
          @Override
          public void afterPropertiesSet() throws Exception {
              buildEnhancer();
              this.listenerContainerConfiguration = this.applicationContext.getBean(ListenerContainerConfiguration.class);
          }
          private void buildEnhancer() {
              if (this.applicationContext != null) {
                  Map<String, AnnotationEnhancer> enhancersMap =
                          this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
                  if (enhancersMap.size() > 0) {
                      List<AnnotationEnhancer> enhancers = enhancersMap.values()
                              .stream()
                              .sorted(new OrderComparator())
                              .collect(Collectors.toList());
                      this.enhancer = (attrs, element) -> {
                          Map<String, Object> newAttrs = attrs;
                          for (AnnotationEnhancer enh : enhancers) {
                              newAttrs = enh.apply(newAttrs, element);
                          }
                          return attrs;
                      };
                  }
              }
          }
          private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) {
              if (this.enhancer == null) {
                  return ann;
              } else {
                  return AnnotationUtils.synthesizeAnnotation(
                          this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null);
              }
          }
          public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>> {
          }
      }
      RocketMQMessageListenerBeanPostProcessor实现了ApplicationContextAware、BeanPostProcessor、InitializingBean接口,其中postProcessAfterInitialization方法判断bean有没有RocketMQMessageListener注解,如果有的话则通过enhance进行增强,另外会通过listenerContainerConfiguration的registerContainer进行注册

      其afterPropertiesSet方法会执行buildEnhancer方法,该方法会获取AnnotationEnhancer的bean实例,然后排序好,最后构造AnnotationEnhancer,其作用就是把这些enhancers挨个apply上去

      registerContainer

      rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

      public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
              Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
              if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                  throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
              }
              if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                  throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
              }
              String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
              String topic = this.environment.resolvePlaceholders(annotation.topic());
              boolean listenerEnabled =
                  (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                      .getOrDefault(topic, true);
              if (!listenerEnabled) {
                  log.debug(
                      "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                      consumerGroup, topic);
                  return;
              }
              validate(annotation);
              String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                  counter.incrementAndGet());
              GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
              genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
                  () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
              DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                  DefaultRocketMQListenerContainer.class);
              if (!container.isRunning()) {
                  try {
                      container.start();
                  } catch (Exception e) {
                      log.error("Started container failed. {}", container, e);
                      throw new RuntimeException(e);
                  }
              }
              log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
          }
      ListenerContainerConfiguration的registerContainer方法会根据注解信息及对应的bean构造DefaultRocketMQListenerContainer并注册到genericApplicationContext,同时执行其start方法

      DefaultRocketMQListenerContainer

      rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

      public class DefaultRocketMQListenerContainer implements InitializingBean,
          RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
          private DefaultMQPushConsumer consumer;
          private RocketMQListener rocketMQListener;
          @Override
          public void start() {
              if (this.isRunning()) {
                  throw new IllegalStateException("container already running. " + this.toString());
              }
              try {
                  consumer.start();
              } catch (MQClientException e) {
                  throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
          编程客栈    }
              this.setRunning(true);
              log.info("running container: {}", this.toString());
          }
          @Override
          public void afterPropertiesSet() throws Exception {
              initRocketMQPushConsumer();
              this.messageType = getMessageType();
              this.methodParameter = getMethodParameter();
              log.debug("RocketMQ messageType: {}", messageType);
          }
          private void initRocketMQPushConsumer() throws MQClientException {
              if (rocketMQListener == null && rocketMQReplyListener == null) {
                  throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
              }
              Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
              Assert.notNull(nameServer, "Property 'nameServer' is required");
              Assert.notNull(topic, "Property 'topic' is required");
              RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
                  this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
              boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
              if (Objects.nonNull(rpcHook)) {
                  consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                      enableMsgTrace, this.applicationContext.getEnvironment().
                      resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
                  consumer.setVipChannelEnabled(false);
              } else {
                  log.debug("Access-key or secret-key not configure in " + this + ".");
                  consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                      this.applicationContext.getEnvironment().
                          resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
              }
              consumer.setNamespace(namespace);
              String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
              if (customizedNameServer != null) {
                  consumer.setNamesrvAddr(customizedNameServer);
              } else {
                  consumer.setNamesrvAddr(nameServer);
              }
              if (accessChannel != null) {
                  consumer.setAccessChannel(accessChannel);
              }
              consumer.setConsumeThreadMax(consumeThreadMax);
              consumer.setConsumeThreadMin(consumeThreadNumber);
              consumer.setConsumeTimeout(consumeTimeout);
              consumer.setMaxReconsumeTimes(maxReconsumeTimes);
              consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
              consumer.setInstanceName(instanceName);
              switch (messageModel) {
                  case BROADCASTING:
                      consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
                      break;
                  case CLUSTERING:
                      consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.CLUSTERING);
                      break;
                  default:
                      throw new IllegalArgumentException("Property 'messageModel' was wrong.");
              }
              switch (selectorType) {
                  case TAG:
                      consumer.subscribe(topic, selectorExpression);
                      break;
                  case SQL92:
                      consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                      break;
                  default:
                      throw new IllegalArgumentException("Property 'selectorType' was wrong.");
              }
              switch (consumeMode) {
                  case ORDERLY:
                      consumer.setMessageListener(new DefaultMessageListenerOrderly());
                      break;
                  case CONCURRENTLY:
                      consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                      break;
                  default:
                      throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
              }
              //if String is not is equal "true" TLS mode will represent the as default value false
              consumer.setUseTLS(new Boolean(tlsEnable));
              if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
                  ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
              } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
                  ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
              }
          }
          //......
      }
      DefaultRocketMQListenerContainer的start方法(SmartLifecycle)执行consumer的start方法;其afterPropertiesSet方法(InitializingBean)会执行initRocketMQPushConsumer方法来创建consumer

      initRocketMQPushConsumer方法主要是创建DefaultMQPushConsumer,设置messageModel,根据selectorType执行subscribe方法,根据consumeMode来设置messageListener(DefaultMessageListenerOrderly或者DefaultMessageListenerConcurrently),最后针对RocketMQPushandroidConsumerLifecycleListener执行prepareStart方法

      DefaultMessageListenerOrderly

      rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

      public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
              @SuppressWarnings("unchecked")
              @Override
              public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                  for (MessageExt messageExt : msgs) {
                      log.debug("received msg: {}", messageExt);
                      try {
                          long now = System.currentTimeMillis();
                          handleMessage(messageExt);
                          long costTime = System.currentTimeMillis() - now;
                          log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                      } catch (Exception e) {
                          log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                          context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                          return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                      }
                  }
                  return ConsumeOrderlyStatus.SUCCESS;
              }
          }
      DefaultMessageListenerOrderly实现了MessageListenerOrderly的consumeMessage接口,它内部遍历msgs,挨个执行handleMessage,有异常的话则设置context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

      DefaultMessageListenerConcurrently

      rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

      public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
              @SuppressWarnings("unchecked")
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                  for (MessageExt messageExt : msgs) {
                      log.debug("received msg: {}", messageExt);
                      try {
                          long now = System.currentTimeMillis();
                          handleMessage(messageExt);
                          long costTime = System.currentTimeMillis() - now;
                          log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                      } catch (Exception e) {
                          log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                          context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                          return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                      }
                  }
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          }
      DefaultMessageListenerConcurrently实现了MessageListenerConcurrently接口的consumeMessage方法,它内部遍历msgs,挨个执行handleMessage,有异常的话设置context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume),然后返回ConsumeConcurrentlyStatus.RECONSUME_LATER

      handleMessage

      private void handleMessage(
              MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
              if (rocketMQListener != null) {
                  rocketMQListener.onMessage(doConvertMessage(messageExt));
              } else if (rocketMQReplyListener != null) {
                  Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
                  Message<?> message = MessageBuilder.withPayload(replyContent).build();
      
                  org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
                  DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
                  producer.setSendMsgTimeout(replyTimeout);
                  producer.send(replyMessage, new SendCallback() {
                      @Override public void onSuccess(SendResult sendResult) {
                          if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                              log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                          } else {
                              log.debug("Consumer replies message success.");
                          }
                      }
      
                      @Override public void onException(Throwable e) {
                          log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
                      }
                  });
              }
          }
      handleMessage方法则是委托给了rocketMQListener.onMessage(doConvertMessage(messageExt)),即回调业务自定义的RocketMQListener

      start

      org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

      /**
           * This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
           *
           * @throws MQClientException if there is any client error.
           */
          @Override
          public void start() throws MQClientException {
              setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
              this.defaultMQPushConsumerImpl.start();
              if (null != traceDispatcher) {
                  try {
                      traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
                  } catch (MQClientException e) {
                      log.warn("trace dispatcher start failed ", e);
                  }
              }
          }
      DefaultMQPushConsumer的start方法先执行setConsumerGroup,然后委托给了defaultMQPushConsumerImpl.start(),如果有traceDispatcher的话,则执行traceDispatcher.start方法

      defaultMQPushConsumerImpl.start()方法会触发MQClientInstance.start(),后者会触发pullMessageService.start()以及rebalanceService.start()(会在doRebalance的时候触发defaultMQPushConsumerImpl.executePullRequestImmediately,往pullRequestQueue放pullRequest),前者会从pullRequestQueue获取pullRequest然后执行DefaultMQPushConsumerImpl.pullMessage方法,里头是执行pullAPIWrapper.pullKernelImpl,然后通过pullCallback往processQueue.putMessage,再触发consumeMessageService.submitConsumeRequest,它会回调listener.consumeMessage来消费消息

      小结

      RocketMQMessageListenerBeanPostProcessor实现了ApplicationContextAware、BeanPostProcessor、InitializingBean接口,其中postProcessAfterInitialization方法判断bean有没有RocketMQMessageListener注解,如果有的话则通过enhance进行增强,另外会通过listenerContainerConfiguration的registerContainer进行注册

      ListenerContainerConfiguration的registerContainer方法会根据注解信息及对应的bean构造DefaultRocketMQListenerContainer并注册到genericApplicationContext,同时执行其start方法

      DefaultRocketMQListenerContainer的start方法(SmartLifecycle)执行consumer的start方法;其afterPropertiesSet方法(InitializingBean)会执行initRocketMQPushConsumer方法来创建编程consumer

      start方法主要是执行pullMessageService.start()以及rebalanceService.start(),前者负责从pullRequestQueue获取pullRequest然后拉取消息放到processQueue,然后触发回调listener.consumeMessage来消费消息;后者负责rebalance,一开始会触发defaultMQPushConsumerImpl.executePullRequestImmediately,即往pullRequestQueue放pullRequest

      pushConsandroidumer本质上还是基于pull的模式来的,从RocketMQMessageListenerBeanPostProcessor --> DefaultRocketMQListenerContainer.start --> DefaultMQPushConsumer.start --> defaultMQPushConsumerImpl.start() --> pullMessageService.start()以及rebalanceService.start()

      以上就是RocketMQMessageListener注解对rocketmq消息的消费实现机制的详细内容,更多关于RocketMQMessageListener rocketmq消费的资料请关注编程客栈(www.devze.com)其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号