开发者

SpringCloud @RefreshScope刷新机制深入探究

开发者 https://www.devze.com 2023-03-13 10:27 出处:网络 作者: zlpzlpzyd
目录梳理过程如下@RefreshScopeScopedProxyModeRefreshAutoConfigurationNacosConfigServiceClientWorkerCacheDataAbstractSharedListenerNacosContextRefresherRefreshEventListenerContextRefresherEventPublishin
目录
  • 梳理过程如下
    • @RefreshScope
    • ScopedProxyMode
    • RefreshAutoConfiguration
    • NacosConfigService
    • ClientWorker
    • CacheData
    • AbstractSharedListener
    • NacosContextRefresher
    • RefreshEventListener
    • ContextRefresher
    • EventPublishingRunListener
    • RestartListener
  • Java连接nacos后会定时心跳连接
    • 总结

      在学习 nacos 的配置修改发现用到了 @RefreshScope 注解,将 spring boot 的日志调整如下

      logging:

        level:

          com:

            alibaba:

              cloud: debug

          org:

            springframework:

              context: debug

              cloud: debug

      调用 nacos 的配置修改,看到如下信息

      2023-03-10 15:48:15.332 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.ClientWorker Caller+0     at com.alibaba.nacos.client.config.impl.ClientWorker.parseUpdateDataIdResponse(ClientWorker.java:486)

       - [fixed-node1.hahaou.cn_8848] [polling-resp] config changed. dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP

      2023-03-10 15:48:15.333 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.ClientWorker Caller+0     at com.alibaba.nacos.client.config.impl.ClientWorker$LongPollingRunnable.run(ClientWorker.java:598)

       - get changedGroupKeys:[soft-jraft-apache-derby-config-test.yaml+DEFAULT_GROUP]

      2023-03-10 15:48:15.400 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.ClientWorker Caller+0     at com.alibaba.nacos.client.config.impl.ClientWorker$LongPollingRunnable.run(ClientWorker.java:616)

       - [fixed-node1.hahaou.cn_8848] [data-received] dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, tenant=null, md5=5f214678315ac83144e77f4c4b3b3416, content=spring:

        youxia:

          config:

            name: test, type=

      2023-03-10 15:48:15.400 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.CacheData Caller+0     at com.alibaba.nacos.client.config.impl.CacheData$1.run(CacheData.java:199)

       - [fixed-node1.hahaou.cn_8848] [notify-context] dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, md5=5f214678315ac83144e77f4c4b3b3416

      2023-03-10 15:48:15.401 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.cloud.endpoint.event.RefreshEventListener Caller+0     at org.springframework.cloud.endpoint.event.RefreshEventListener.handle(RefreshEventListener.java:71)

       - Event received Refresh Nacos config

      2023-03-10 15:48:17.002 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0     at org.springframework.context.support.AbstractApplicationContext.prepareRefresh(AbstractApplicationContext.java:596)

       - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@653cd99a

      2023-03-10 15:48:18.632 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.client.NacosPropertySourceBuilder Caller+0     at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData(NacosPropertySourceBuilder.java:93)

       - Loading nacos data, dataId: 'soft-jraft-apache-derby-config-test.yaml', group: 'DEFAULT_GROUP', data: spring:

        youxia:

          config:

            name: test

      2023-03-10 15:48:18.706 WARN [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.client.NacosPropertySourceBuilder Caller+0     at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData(NacosPropertySourceBuilder.java:87)

       - Ignore the empty nacos configuration and get it based on dataId[soft-jraft-apache-derby-config] & group[DEFAULT_GROUP]

      2023-03-10 15:48:18.789 WARN [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.client.NacosPropertySourceBuilder Caller+0     at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData(NacosPropertySourceBuilder.java:87)

       - Ignore the empty nacos configuration and get it based on dataId[soft-jraft-apache-derby-config.properties] & group[DEFAULT_GROUP]

      2023-03-10 15:48:18.790 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.b.c.PropertySourceBootstrapConfiguration Caller+0     at org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration.initialize(PropertySourceBootstrapConfiguration.java:112)

       - Located property source: [BootstrapPropertySource@1783236684 {name='bootstrapProperties-soft-jraft-apache-derby-config.properties,DEFAULT_GROUP', properties={}}, BootstrapPropertySource@942001677 {name='bootstrapProperties-soft-jraft-apache-derby-config,DEFAULT_GROUP', properties={}}, BootstrapPropertySource@1637255792 {name='bootstrapProperties-soft-jraft-apache-derby-config-test.yaml,DEFAULT_GROUP', properties={spring.youxia.config.name=test}}]

      2023-03-10 15:48:18.800 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] org.springframework.boot.SpringApplication Caller+0     at org.springframework.boot.SpringApplication.logStartupProfileInfo(SpringApplication.java:651)

       - No active profile set, falling back to default profiles: default

      2023-03-10 15:48:18.801 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0     at org.springframework.context.support.AbstractApplicationContext.prepareRefresh(AbstractApplicationContext.java:596)

       - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6709b8b

      2023-03-10 15:48:18.806 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] org.springframework.boot.SpringApplication Caller+0     at org.springframework.boot.StartupInfoLogger.logStarted(StartupInfoLogger.java:61)

       - Started application in 3.403 seconds (JVM running for 54.758)

      2023-03-10 15:48:18.807 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0     at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1006)

       - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6709b8b, started on Fri Mar 10 15:48:18 CST 2023, parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@653cd99a

      2023-03-10 15:48:18.808 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0     at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1006)

       - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@653cd99a, started on Fri Mar 10 15:48:17 CST 2023

      2023-03-10 15:48:18.819 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.cloud.endpoint.event.RefreshEventListener Caller+0     at org.springframework.cloud.endpoint.event.RefreshEventListener.handle(RefreshEventListener.java:73)

       - Refresh keys changed: [spring.youxia.config.name]

      2023-03-10 15:48:18.820 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.refresh.NacosContextRefresher Caller+0     at com.alibaba.cloud.nacos.refresh.NacosContextRefresher$1.innerReceive(NacosContextRefresher.java:136)

       - Refresh Nacos config group=DEFAULT_GROUP,dataId=soft-jraft-apache-derby-config-test.yaml,configInfo=spring:

        youxia:

          config:

            name: test

      2023-03-10 15:48:18.820 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.CacheData Caller+0     at com.alibaba.nacos.client.config.impl.CacheData$1.run(CacheData.java:222)

       - [fixed-node1.hahaou.cn_8848] [notify-ok] dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, md5=5f214678315ac83144e77f4c4b3b3416, listener=com.alibaba.cloud.nacos.refresh.NacosContextRefresher$1@59af462e 

      2023-03-10 15:48:18.820 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.CacheData Caller+0     at com.alibaba.nacos.client.config.impl.CacheData.safeNotifyListener(CacheData.java:248)

       - [fixed-node1.hahaou.cn_8848] [notify-listener] time cost=3420ms in ClientWorker, dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, md5=5f214678315ac83144e77f4c4b3b3416, listener=com.alibaba.cloud.nacos.refresh.NacosContextRefresher$1@59af462e 

      使用Spring Cloud Alibaba接入Nacos配置中心,获取配置信息name为:test

      使用Spring Cloud Alibaba接入Nacos配置中心,获取配置信息value为:null

      得知保存配置后进行了 jvm 重启。

      梳理过程如下

      @RefreshScope

      package org.springframework.cloud.context.config.annotation;
      import java.lang.annotation.Documented;
      import java.lang.annotation.ElementType;
      import java.lang.annotation.Retention;
      import java.lang.annotation.RetentionPolicy;
      import java.lang.annotation.Target;
      import org.springframework.context.annotation.Scope;
      import org.springframework.context.annotation.ScopedProxyMode;
      /**
       * Convenience annotation to put a <code>@Bean</code> definition in
       * {@link org.springframework.cloud.context.scope.refresh.RefreshScope refresh scope}.
       * Beans annotated this way can be refreshed at runtime and any components that are using
       * them will get a new instance on the next method call, fully initialized and injected
       * with all dependencies.
       *
       * @author Dave Syer
       *
       */
      @Target({ ElementType.TYPE, ElementType.METHOD })
      @Retention(RetentionPolicy.RUNTIME)
      @Scope("refresh")
      @Documented
      public @interface RefreshScope {
          /**
           * @see Scope#proxyMode()
           * @return proxy mode
           */
          ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;
      }

      可以得知,@RefreshScope 是一个 scopeName 为 refresh 的 @Scope。

      ScopedProxyMode

      package org.springframework.context.annotation;
      /**
       * Enumerates the various scoped-proxy options.
       *
       * <p>For a more complete discussion of exactly what a scoped proxy is, see the
       * section of the Spring reference documentation entitled '<em>Scoped beans as
       * dependencies</em>'.
       *
       * @author Mark Fisher
       * @since 2.5
       * @see ScopeMetadata
       */
      public enum ScopedProxyMode {
          /**
           * Default typically equals {@link #NO}, unless a different default
           * has been configured at the component-scan instruction level.
           */
          DEFAULT,
          /**
           * Do not create a scoped proxy.
           * <p>This proxy-mode is not typically useful when used with a
           * non-singleton scoped instance, which should favor the use of the
           * {@link #INTERFACES} or {@link #TARGET_CLASS} proxy-modes instead if it
           * is to be used as a dependency.
           */
          NO,
          /**
           * Create a JDK dynamic proxy implementing <i>all</i> interfaces exposed by
           * the class of the target object.
           */
          INTERFACES,
          /**
           * Create a class-based proxy (uses CGLIB).
           */
          TARGET_CLASS
      }

      由枚举 ScopedProxyMode 得知,ScopedProxyMode.TARGET_CLASS 通过 cglib 生成一个代理类进行字节码增强

      RefreshAutoConfiguration

      部分源码如下

      pachttp://www.devze.comkage org.springframework.cloud.autoconfigure;
      import java.util.HashSet;
      import java.util.Set;
      import javax.annotation.PostConstruct;
      import org.springframework.aop.scope.ScopedProxyUtils;
      import org.springframework.beans.BeansException;
      import org.springframework.beans.factory.BeanFactory;
      import org.springframework.beans.factory.ListableBeanFactory;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.beans.factory.config.BeanDefinition;
      import org.springframework.beans.factory.config.BeanDefinitionHolder;
      import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
      import org.springframework.beans.factory.support.BeanDefinitionRegistry;
      import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
      import org.springframework.boot.autoconfigure.AutoConfigureBefore;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
      import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
      import org.springframework.boot.context.properties.bind.Bindable;
      import org.springframework.boot.context.properties.bind.Binder;
      import org.springframework.cloud.context.refresh.ContextRefresher;
      import org.springframework.cloud.context.scope.refresh.RefreshScope;
      import org.springframework.cloud.endpoint.event.RefreshEventListener;
      import org.springframework.cloud.logging.LoggingRebinder;
      import org.springframework.context.ConfigurableApplicationContext;
      import org.springframework.context.EnvironmentAware;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.context.weaving.编程LoadTimeWeaverAware;
      import org.springframework.core.env.Environment;
      import org.springframework.core.env.StandardEnvironment;
      import org.springframework.instrument.classloading.LoadTimeWeaver;
      import org.springframework.stereotype.Component;
      import org.springframework.util.StringUtils;
      /**
       * Autoconfiguration for the refresh scope and associated features to do with changes in
       * the Environment (e.g. rebinding logger levels).
       *
       * @author Dave Syer
       * @author Venil Noronha
       */
      @Configuration(proxyBeanMethods = false)
      @ConditionalOnClass(RefreshScope.class)
      @ConditionalOnProperty(name = RefreshAutoConfiguration.REFRESH_SCOPE_ENABLED,
              matchIfMissing = true)
      @AutoConfigureBefore(HibernateJpaAutoConfiguration.class)
      public class RefreshAutoConfiguration {
          /**
           * Name of the refresh scope name.
           */
          public static final String REFRESH_SCOPE_NAME = "refresh";
          /**
           * Name of the prefix for refresh scope.
           */
          public static final String REFRESH_SCOPE_PREFIX = "spring.cloud.refresh";
          /**
           * Name of the enabled prefix for refresh scope.
           */
          public static final String REFRESH_SCOPE_ENABLED = REFRESH_SCOPE_PREFIX + ".enabled";
          @Bean
          @ConditionalOnMissingBean(RefreshScope.class)
          public static RefreshScope refreshScope() {
              return new RefreshScope();
          }
          @Bean
          @ConditionalOnMissingBean
          public static LoggingRebinder loggingRebinder() {
              return new LoggingRebinder();
          }
          @Bean
          @ConditionalOnMissingBean
          public ContextRefresher contextRefresher(ConfigurableApplicationContext context,
                  RefreshScope scope) {
              return new ContextRefresher(context, scope);
          }
          @Bean
          public RefreshEventListener refreshEventListener(ContextRefresher contextRefresher) {
              return new RefreshEventListener(contextRefresher);
          }
      }
      @ConditionalOnProperty(name = RefreshAutoConfiguration.REFRESH_SCOPE_ENABLED,
              matchIfMissing = true)

      通过上面的注解得知,自动刷新在 spring cloud 中默认启用

      SpringCloud @RefreshScope刷新机制深入探究

      nacos中添加配置

      NacosConfigService

      public NacosConfigService(Properties properties) throws NacosException {
          ValidatorUtils.checkInitParam(properties);
          String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
          if (StringUtils.isBlank(encodeTmp)) {
              this.encode = Constants.ENCODE;
          } else {
              this.encode = encodeTmp.trim();
          }
          initNamespace(properties);
          this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
          this.agent.start();
          this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
      }

      NacosConfigService 构造器创建 ClientWorker 对象

      ClientWorker

      public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
              final Properties properties) {
          this.agent = agent;
          this.configFilterChainManager = configFilterChainManager;
          // Initialize the timeout parameter
          init(properties);
          this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
              @Override
              public Thread newThread(Runnable r) {
                  Thread t = new Thread(r);
                  t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                  t.setDaemon(true);
                  return t;
              }
          });
          this.executorService = Executors
                  .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                      @Override
                      public Thread newThread(Runnable r) {
                          Thread t = new Thread(r);
                          t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                          t.setDaemon(true);
                          return t;
                      }
                  });
          this.executor.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                  try {
                      checkConfigInfo();
                  } catch (Throwable e) {
                      LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                  }
              }
          }, 1L, 10L, TimeUnit.MILLISECONDS);
      }
      public void checkConfigInfo() {
          // Dispatch taskes.
          int listenerSize = cacheMap.size();
          // Round up the longingTaskCount.
          int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
          if (longingTaskCount > currentLongingTaskCount) {
              for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                  // The task list is no order.So it maybe has issues when changing.
                  executorService.execute(new LongPollingRunnable(i));
              }
              currentLongingTaskCount = longingTaskCount;
          }
      }
      class LongPollingRunnable implements Runnable {
          private final int taskId;
          public LongPollingRunnable(int taskId) {
              this.taskId = taskId;
          }
          @Override
          public void run() {
              List<CacheData> cacheDatas = new ArrayList<CacheData>();
              List<String> inInitializingCacheList = new ArrayList<String>();
              try {
                  // check failover config
                  for (CacheData cacheData : cacheMap.values()) {
                      if (cacheData.getTaskId() == taskId) {
                          cacheDatas.add(cacheData);
                          try {
                              checkLocalConfig(cacheData);
                              if (cacheData.isUseLocalConfigInfo()) {
                                  cacheData.checkListenerMd5();
                              }
                          } catch (Exception e) {
                              LOGGER.error("get local config info error", e);
                          }
                      }
                  }
                  // check server config
                  List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                  if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                      LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
                  }
                  for (String groupKey : changedGroupKeys) {
                      String[] key = GroupKey.parseKey(groupKey);
                      String dataId = key[0];
                      String group = key[1];
                      String tenant = null;
                      if (key.length == 3) {
                          tenant = key[2];
                      }
                      try {
                          String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                          CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                          cache.setContent(ct[0]);
                          if (null != ct[1]) {
                              cache.setType(ct[1]);
                          }
                          LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                                  agent.getName(), dataId, group, tenant, cache.getMd5(),
                                  ContentUtils.truncateContent(ct[0]), ct[1]);
              android        } catch (NacosException ioe) {
                          String message = String
                                  .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                          agent.getName(), dataId, group, tenant);
                          LOGGER.error(message, ioe);
                      }
                  }
                  for (CacheData cacheData : cacheDatas) {
                      if (!cacheData.isInitializing() || inInitializingCacheList
                              .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                          cacheData.checkListenerMd5();
                          cacheData.setInitializing(false);
                      }
                  }
                  inInitializingCacheList.clear();
                  executorService.execute(this);
              } catch (Throwable e) {
                  // If the rotation training task is abnormal, the next execution time of the task will be punished
                  LOGGER.error("longPolling error : ", e);
                  executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
              }
          }
      }

      ClientWorker checkConfigInfo() 中通过线程池创建 LongPollingRunnable 对象,线程池名称前缀为 com.alibaba.nacos.client.Worker.longPolling

      CacheData

      void checkListenerMd5() {
          for (ManagerListenerWrap wrap : listeners) {
              if (!md5.equals(wrap.lastCallMd5)) {
                  safeNotifyListener(dataId, group, content, type, md5, wrap);
              }
          }
      }
      private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
              final String md5, final ManagerListenerWrap listenerWrap) {
          final Listener listener = listenerWrap.listener;
          Runnable job = new Runnable() {
              @Override
              public void run() {
                  ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                  ClassLoader appClassLoader = listener.getClass().getClassLoader();
                  try {
                      if (listener instanceof AbstractSharedListener) {
                          AbstractSharedListener adapter = (AbstractSharedListener) listener;
                          adapter.fillContext(dataId, group);
                          LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                      }
                      // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                      Thread.currentThread().setContextClassLoader(appClassLoader);
                      ConfigResponse cr = new ConfigResponse();
                      cr.setDataId(dataId);
                      cr.setGroup(group);
                      cr.setContent(content);
                      configFilterChainManager.doFilter(null, cr);python
                      String contentTmp = cr.getContent();
                      listener.receiveConfigInfo(contentTmp);
                      // compare lastContent and content
                      if (listener instanceof AbstractConfi开发者_Go教程gChangeListener) {
                          Map data = ConfigChangeHandler.getInstance()
                                  .parseChangeData(listenerWrap.lastContent, content, type);
                          ConfigChangeEvent event = new ConfigChangeEvent(data);
                          ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                          listenerWrap.lastContent = content;
                      }
                      listenerWrap.lastCallMd5 = md5;
                      LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                              listener);
                  } catch (NacosException ex) {
                      LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                              name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
                  } catch (Throwable t) {
                      LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                              group, md5, listener, t.getCause());
                  } finally {
                      Thread.currentThread().setContextClassLoader(myClassLoader);
                  }
              }
          };
          final long startNotify = System.currentTimeMillis();
          try {
              if (null != listener.getExecutor()) {
                  listener.getExecutor().execute(job);
              } else {
                  job.run();
              }
          } catch (Throwable t) {
              LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                      group, md5, listener, t.getCause());
          }
          final long finishNotify = System.currentTimeMillis();
          LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
                  name, (finishNotify - startNotify), dataId, group, md5, listener);
      }

      调用 CacheData 的 checkListenerMd5()

      AbstractSharedListener

      public abstract class AbstractSharedListener implements Listener {
          private volatile String dataId;
          private volatile String group;
          public final void fillContext(String dataId, String group) {
              this.dataId = dataId;
              this.group = group;
          }
          @Override
          public final void receiveConfigInfo(String configInfo) {
              innerReceive(dataId, group, configInfo);
          }
          @Override
          public Executor getExecutor() {
              return null;
          }
          /**
           * receive.
           *
           * @param dataId     data ID
           * @param group      group
           * @param configInfo content
           */
          public abstract void innerReceive(String dataId, String group, String configInfo);
      }

      调用 AbstractSharedListener 的 receiveConfigInfo(),在 NacosContextRefresher 的 registerNacosListener() 中进行实现

      NacosContextRefresher

      private void registerNacosListener(final String groupKey, final String dataKey) {
          String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
          Listener listener = listenerMap.computeIfAbsent(key,
                  lst -> new AbstractSharedListener() {
                      @Override
                      public void innerReceive(String dataId, String group,
                              String configInfo) {
                          refreshCountIncrement();
                          nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
                          // todo feature: support single refresh for listening
                          applicationContext.publishEvent(
                                  new RefreshEvent(this, null, "Refresh Nacos config"));
                          if (log.isDebugEnabled()) {
                              log.debug(String.format(
                                      "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
                                      group, dataId, configInfo));
                          }
                      }
                  });
          try {
              configService.addListener(dataKey, groupKey, listener);
          }
          catch (NacosException e) {
              log.warn(String.format(
                      "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
                      groupKey), e);
          }
      }

      最终通过 ApplicationContext 发布事件 RefreshEvent

      至此,nacos 逻辑执行完毕。

      RefreshEventListener

      public class RefreshEventListener implements SmartApplicationListener {
          private static Log log = LogFactory.getLog(RefreshEventListener.class);
          private ContextRefresher refresh;
          private AtomicBoolean ready = new AtomicBoolean(false);
          public RefreshEventListener(ContextRefresher refresh) {
              this.refresh = refresh;
          }
          @Override
          public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
              return ApplicationReadyEvent.class.isAssignableFrom(eventType)
                      || RefreshEvent.class.isAssignableFrom(eventType);
          }
          @Override
          public void onApplicationEvent(ApplicationEvent event) {
              if (event instanceof ApplicationReadyEvent) {
                  handle((ApplicationReadyEvent) event);
              }
              else if (event instanceof RefreshEvent) {
                  handle((RefreshEvent) event);
              }
          }
          public void handle(ApplicationReadyEvent event) {
              this.ready.compareAndSet(false, true);
          }
          public void handle(RefreshEvent event) {
              if (this.ready.get()) { // don't handle events before app is ready
                  log.debug("Event received " + event.getEventDesc());
                  Set<String> keys = this.refresh.refresh();
                  log.info("Refresh keys changed: " + keys);
              }
          }
      }

      调用 RefreshEventListener 的 onApplicationEvent(),事件对象为 RefreshEvent。

      执行完可以看到打印了日志

      Event received Refresh Nacos config

      后面调用 ContextRefresher 的 refresh()

      ContextRefresher

      public synchronized Set<String> refresh() {
          Set<String> keys = refreshEnvironment();
          this.scope.refreshAll();
          return keys;
      }
      public synchronized Set<String> refreshEnvironment() {
          Map<String, Object> before = extract(
                  this.context.getEnvironment().getPropertySources());
          addConfigFilesToEnvironment();
          Set<String> keys = changes(before,
                  extract(this.context.getEnvironment().getPropertySources())).keySet();
          this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
          return keys;
      }
      /* For testing. */ ConfigurableApplicationContext addConfigFilesToEnvironment() {
          ConfigurableApplicationContext capture = null;
          try {
              StandardEnvironment environment = copyEnvironment(
                      this.context.getEnvironment());
              SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class)
                      .bannerMode(Mode.OFF).web(WebApplicationType.NONE)
                      .environment(environment);
              // Just the listeners that affect the environment (e.g. excluding logging
              // listener because it has side effects)
              builder.application()
                      .setListeners(Arrays.asList(new BootstrapApplicationListener(),
                              new ConfigFileApplicationListener()));
              capture = builder.run();
              if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) {
                  environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE);
              }
              MutablePropertySources target = this.context.getEnvironment()
                      .getPropertySources();
              String targetName = null;
              for (PropertySource<?> source : environment.getPropertySources()) {
                  String name = source.getName();
                  if (target.contains(name)) {
                      targetName = name;
                  }
                  if (!this.standardSources.contains(name)) {
                      if (target.contains(name)) {
                          target.replace(name, source);
                      }
                      else {
                          if (targetName != null) {
                              target.addAfter(targetName, source);
                              // update targetName to preserve ordering
                              targetName = name;
                          }
                          else {
                              // targetName was null so we are at the start of the list
                              target.addFirst(source);
                              targetName = name;
                          }
                      }
                  }
              }
          }
          finally {
              ConfigurableApplicationContext closeable = capture;
              while (closeable != null) {
                  try {
                      closeable.close();
                  }
                  catch (Exception e) {
                      // Ignore;
                  }
                  if (closeable.getParent() instanceof ConfigurableApplicationContext) {
                      closeable = (ConfigurableApplicationContext) closeable.getParent();
                  }
                  else {
                      break;
                  }
              }
          }
          return capture;
      }

      refreshEnvironment() 中执行如下操作

      addConfigFilesToEnvironment() 添加到配置文件到环境中,发布一系列事件 BootstrapApplicationListener、ConfigFileApplicationListener

      调用 EventPublishingRunListener 的发布一系列事件进行 jvm 的重启相关操作,其中 EventPublishingRunListener 是默认的监听器,在 spring boot 的 META-INF/spring.factories 中进行了指定

      # Run Listeners

      org.springframework.boot.SpringApplicationRunListener=\

      org.springframework.boot.context.event.EventPublishingRunListener

      EventPublishingRunListener

      public class EventPublishingRunListener implements SpringApplicationRunListener, Ordered {
          private final SpringApplication application;
          private final String[] args;
          private final SimpleApplicationEventMulticaster initialMulticaster;
          public EventPublishingRunListener(SpringApplication application, String[] args) {
              this.application = application;
              this.args = args;
              this.initialMulticaster = new SimpleApplicationEventMulticaster();
              for (ApplicationListener<?> listener : application.getListeners()) {
                  this.initialMulticaster.addApplicationListener(listener);
              }
          }
          @Override
          public int getOrder() {
              return 0;
          }
          @Override
          public void starting() {
              this.initialMulticaster.multicastEvent(new ApplicationStartingEvent(this.application, this.args));
          }
          @Override
          public void environmentPrepared(ConfigurableEnvironment environment) {
              this.initialMulticaster
                      .multicastEvent(new ApplicationEnvironmentPreparedEvent(this.application, this.args, environment));
          }
          @Override
          public void contextPrepared(ConfigurableApplicationContext context) {
              this.initialMulticaster
                      .multicastEvent(new ApplicationContextInitializedEvent(this.application, this.args, context));
          }
          @Override
          public void contextLoaded(ConfigurableApplicationContext context) {
              for (ApplicationListener<?> listener : this.application.getListeners()) {
                  if (listener instanceof ApplicationContextAware) {
                      ((ApplicationContextAware) listener).setApplicationContext(context);
                  }
                  context.addApplicationListener(listener);
              }
              this.initialMulticaster.multicastEvent(new ApplicationPreparedEvent(this.application, this.args, context));
          }
          @Override
          public void started(ConfigurableApplicationContext context) {
              context.publishEvent(new ApplicationStartedEvent(this.application, this.args, context));
          }
          @Override
          public void running(ConfigurableApplicationContext context) {
              context.publishEvent(new ApplicationReadyEvent(this.application, this.args, context));
          }
          @Override
          public void failed(ConfigurableApplicationContext context, Throwable exception) {
              ApplicationFailedEvent event = new ApplicationFailedEvent(this.application, this.args, context, exception);
              if (context != null && context.isActive()) {
                  // Listeners have been registered to the application context so we should
                  // use it at this point if we can
                  context.publishEvent(event);
              }
              else {
                  // An inactive context may not have a multicaster so we use our multicaster to
                  // call all of the context's listeners instead
                  if (context instanceof AbstractApplicationContext) {
                      for (ApplicationListener<?> listener : ((AbstractApplicationContext) context)
                              .getApplicationListeners()) {
                          this.initialMulticaster.addApplicationListener(listener);
                      }
                  }
                  this.initialMulticaster.setErrorHandler(new LoggingErrorHandler());
                  this.initialMulticaster.multicastEvent(event);
              }
          }
          private static class LoggingErrorHandler implements ErrorHandler {
              private static final Log logger = LogFactory.getLog(EventPublishingRunListener.class);
              @Override
              public void handleError(Throwable throwable) {
                  logger.warn("Error calling ApplicationEventListener", throwable);
              }
          }
      }

      调用了 contextLoaded(),在 RestartListener 的 onApplicationEvent() 中进行调用

      RestartListener

      public class RestartListener implements SmartApplicationListener {
          private ConfigurableApplicationContext context;
          private ApplicationPreparedEvent event;
          @Override
          public int getOrder() {
              return 0;
          }
          @Override
          public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
              return ApplicationPreparedEvent.class.isAssignableFrom(eventType)
                      || ContextRefreshedEvent.class.isAssignableFrom(eventType)
                      || ContextClosedEvent.class.isAssignableFrom(eventType);
          }
          @Override
          public boolean supportsSourceType(Class<?> sourceType) {
              return true;
          }
          @Override
          public void onApplicationEvent(ApplicationEvent input) {
              if (input instanceof ApplicationPreparedEvent) {
                  this.event = (ApplicationPreparedEvent) input;
                  if (this.context == null) {
                      this.context = this.event.getApplicationContext();
                  }
              }
              else if (input instanceof ContextRefreshedEvent) {
                  if (this.context != null && input.getSource().equals(this.context)
                          && this.event != null) {
                      this.context.publishEvent(this.event);
                  }
              }
              else {
                  if (this.context != null && input.getSource().equals(this.context)) {
                      this.context = null;
                      this.event = null;
                  }
              }
          }
      }

      RestartListener 的 onApplicationEvent() 传入 ApplicationPreparedEvent,调用 AbstractApplicationContext 的 refresh(),即进行 ioc 容器重启,此时是第一次

      调用 EventPublishingRunListener 的 running(),进行新的配置加载

      调用 PropertySourceBootstrapConfiguration 的 initialize(),间接调用 NacosPropertySourceLocator 的 locate() 进行文件加载

      调用 RestartListener 的 onApplicationEvent(),参数为 ApplicationPreparedEvent,调用 AbstractApplicationContext 的 refresh() 进行 ioc 容器重启,此时是第二次

      调用 RestartListener 的 onApplicationEvent(),参数为 ContextRefreshedEvent

      至此,ContextRefresher 的 refreshEnvironment(编程) 逻辑执行完毕

      接下来调用 RefreshScope 的 refreshAll(),间接调用父类 GenericScope 的 destroy(),发布事件 RefreshScopeRefreshedEvent 到 ApplicationContext 中

      Java连接nacos后会定时心跳连接

      通过 NacosWatch 开启 ThreadPoolTaskScheduler 进行定时任务发起,事件为 HeartbeatEvent。

      总结

      nacos 的在页面上的配置信息的更新是通过 jvm 重启实现的。想到了 jvm 启动后,无法做热更新,这么做也是不错的选择。由于做了重启,这个适合在没有访问的情况下执行,如果在操作过程中有事务在执行会不好,但是在生产环境中是否有这样的应用目前还不清楚。

      由此可以看到,spring 中大量使用了事件、还有观察者模式、消息队列、消息通知、web 请求响应、窗口点击事件等。

      参考链接

      到此这篇关于SpringCloud @RefreshScope刷新机制深入探究的文章就介绍到这了,更多相关SpringCloud @RefreshScope内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号