目录
- 梳理过程如下
- @RefreshScope
- ScopedProxyMode
- RefreshAutoConfiguration
- NacosConfigService
- ClientWorker
- CacheData
- AbstractSharedListener
- NacosContextRefresher
- RefreshEventListener
- ContextRefresher
- EventPublishingRunListener
- RestartListener
- Java连接nacos后会定时心跳连接
- 总结
在学习 nacos 的配置修改发现用到了 @RefreshScope 注解,将 spring boot 的日志调整如下
logging:
level: com: alibaba: cloud: debug org: springframework: context: debug cloud: debug
调用 nacos 的配置修改,看到如下信息
2023-03-10 15:48:15.332 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.ClientWorker Caller+0 at com.alibaba.nacos.client.config.impl.ClientWorker.parseUpdateDataIdResponse(ClientWorker.java:486)
- [fixed-node1.hahaou.cn_8848] [polling-resp] config changed. dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP2023-03-10 15:48:15.333 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.ClientWorker Caller+0 at com.alibaba.nacos.client.config.impl.ClientWorker$LongPollingRunnable.run(ClientWorker.java:598) - get changedGroupKeys:[soft-jraft-apache-derby-config-test.yaml+DEFAULT_GROUP]2023-03-10 15:48:15.400 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.ClientWorker Caller+0 at com.alibaba.nacos.client.config.impl.ClientWorker$LongPollingRunnable.run(ClientWorker.java:616) - [fixed-node1.hahaou.cn_8848] [data-received] dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, tenant=null, md5=5f214678315ac83144e77f4c4b3b3416, content=spring: youxia: config: name: test, type=2023-03-10 15:48:15.400 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.CacheData Caller+0 at com.alibaba.nacos.client.config.impl.CacheData$1.run(CacheData.java:199) - [fixed-node1.hahaou.cn_8848] [notify-context] dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, md5=5f214678315ac83144e77f4c4b3b34162023-03-10 15:48:15.401 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.cloud.endpoint.event.RefreshEventListener Caller+0 at org.springframework.cloud.endpoint.event.RefreshEventListener.handle(RefreshEventListener.java:71) - Event received Refresh Nacos config2023-03-10 15:48:17.002 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0 at org.springframework.context.support.AbstractApplicationContext.prepareRefresh(AbstractApplicationContext.java:596) - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@653cd99a2023-03-10 15:48:18.632 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.client.NacosPropertySourceBuilder Caller+0 at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData(NacosPropertySourceBuilder.java:93) - Loading nacos data, dataId: 'soft-jraft-apache-derby-config-test.yaml', group: 'DEFAULT_GROUP', data: spring: youxia: config: name: test2023-03-10 15:48:18.706 WARN [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.client.NacosPropertySourceBuilder Caller+0 at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData(NacosPropertySourceBuilder.java:87) - Ignore the empty nacos configuration and get it based on dataId[soft-jraft-apache-derby-config] & group[DEFAULT_GROUP]2023-03-10 15:48:18.789 WARN [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.client.NacosPropertySourceBuilder Caller+0 at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData(NacosPropertySourceBuilder.java:87) - Ignore the empty nacos configuration and get it based on dataId[soft-jraft-apache-derby-config.properties] & group[DEFAULT_GROUP]2023-03-10 15:48:18.790 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.b.c.PropertySourceBootstrapConfiguration Caller+0 at org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration.initialize(PropertySourceBootstrapConfiguration.java:112) - Located property source: [BootstrapPropertySource@1783236684 {name='bootstrapProperties-soft-jraft-apache-derby-config.properties,DEFAULT_GROUP', properties={}}, BootstrapPropertySource@942001677 {name='bootstrapProperties-soft-jraft-apache-derby-config,DEFAULT_GROUP', properties={}}, BootstrapPropertySource@1637255792 {name='bootstrapProperties-soft-jraft-apache-derby-config-test.yaml,DEFAULT_GROUP', properties={spring.youxia.config.name=test}}]2023-03-10 15:48:18.800 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] org.springframework.boot.SpringApplication Caller+0 at org.springframework.boot.SpringApplication.logStartupProfileInfo(SpringApplication.java:651) - No active profile set, falling back to default profiles: default2023-03-10 15:48:18.801 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0 at org.springframework.context.support.AbstractApplicationContext.prepareRefresh(AbstractApplicationContext.java:596) - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6709b8b2023-03-10 15:48:18.806 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] org.springframework.boot.SpringApplication Caller+0 at org.springframework.boot.StartupInfoLogger.logStarted(StartupInfoLogger.java:61) - Started application in 3.403 seconds (JVM running for 54.758)2023-03-10 15:48:18.807 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0 at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1006) - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6709b8b, started on Fri Mar 10 15:48:18 CST 2023, parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@653cd99a2023-03-10 15:48:18.808 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.c.a.AnnotationConfigApplicationContext Caller+0 at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1006) - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@653cd99a, started on Fri Mar 10 15:48:17 CST 20232023-03-10 15:48:18.819 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] o.s.cloud.endpoint.event.RefreshEventListener Caller+0 at org.springframework.cloud.endpoint.event.RefreshEventListener.handle(RefreshEventListener.java:73) - Refresh keys changed: [spring.youxia.config.name]2023-03-10 15:48:18.820 DEBUG [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] c.a.cloud.nacos.refresh.NacosContextRefresher Caller+0 at com.alibaba.cloud.nacos.refresh.NacosContextRefresher$1.innerReceive(NacosContextRefresher.java:136) - Refresh Nacos config group=DEFAULT_GROUP,dataId=soft-jraft-apache-derby-config-test.yaml,configInfo=spring: youxia: config: name: test2023-03-10 15:48:18.820 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.CacheData Caller+0 at com.alibaba.nacos.client.config.impl.CacheData$1.run(CacheData.java:222) - [fixed-node1.hahaou.cn_8848] [notify-ok] dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, md5=5f214678315ac83144e77f4c4b3b3416, listener=com.alibaba.cloud.nacos.refresh.NacosContextRefresher$1@59af462e 2023-03-10 15:48:18.820 INFO [com.alibaba.nacos.client.Worker.longPolling.fixed-node1.hahaou.cn_8848] com.alibaba.nacos.client.config.impl.CacheData Caller+0 at com.alibaba.nacos.client.config.impl.CacheData.safeNotifyListener(CacheData.java:248) - [fixed-node1.hahaou.cn_8848] [notify-listener] time cost=3420ms in ClientWorker, dataId=soft-jraft-apache-derby-config-test.yaml, group=DEFAULT_GROUP, md5=5f214678315ac83144e77f4c4b3b3416, listener=com.alibaba.cloud.nacos.refresh.NacosContextRefresher$1@59af462e 使用Spring Cloud Alibaba接入Nacos配置中心,获取配置信息name为:test使用Spring Cloud Alibaba接入Nacos配置中心,获取配置信息value为:null
得知保存配置后进行了 jvm 重启。
梳理过程如下
@RefreshScope
package org.springframework.cloud.context.config.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.ScopedProxyMode; /** * Convenience annotation to put a <code>@Bean</code> definition in * {@link org.springframework.cloud.context.scope.refresh.RefreshScope refresh scope}. * Beans annotated this way can be refreshed at runtime and any components that are using * them will get a new instance on the next method call, fully initialized and injected * with all dependencies. * * @author Dave Syer * */ @Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Scope("refresh") @Documented public @interface RefreshScope { /** * @see Scope#proxyMode() * @return proxy mode */ ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS; }
可以得知,@RefreshScope 是一个 scopeName 为 refresh 的 @Scope。
ScopedProxyMode
package org.springframework.context.annotation; /** * Enumerates the various scoped-proxy options. * * <p>For a more complete discussion of exactly what a scoped proxy is, see the * section of the Spring reference documentation entitled '<em>Scoped beans as * dependencies</em>'. * * @author Mark Fisher * @since 2.5 * @see ScopeMetadata */ public enum ScopedProxyMode { /** * Default typically equals {@link #NO}, unless a different default * has been configured at the component-scan instruction level. */ DEFAULT, /** * Do not create a scoped proxy. * <p>This proxy-mode is not typically useful when used with a * non-singleton scoped instance, which should favor the use of the * {@link #INTERFACES} or {@link #TARGET_CLASS} proxy-modes instead if it * is to be used as a dependency. */ NO, /** * Create a JDK dynamic proxy implementing <i>all</i> interfaces exposed by * the class of the target object. */ INTERFACES, /** * Create a class-based proxy (uses CGLIB). */ TARGET_CLASS }
由枚举 ScopedProxyMode 得知,ScopedProxyMode.TARGET_CLASS 通过 cglib 生成一个代理类进行字节码增强
RefreshAutoConfiguration
部分源码如下
pachttp://www.devze.comkage org.springframework.cloud.autoconfigure; import java.util.HashSet; import java.util.Set; import javax.annotation.PostConstruct; import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.boot.context.properties.bind.Bindable; import org.springframework.boot.context.properties.bind.Binder; import org.springframework.cloud.context.refresh.ContextRefresher; import org.springframework.cloud.context.scope.refresh.RefreshScope; import org.springframework.cloud.endpoint.event.RefreshEventListener; import org.springframework.cloud.logging.LoggingRebinder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.EnvironmentAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.weaving.编程LoadTimeWeaverAware; import org.springframework.core.env.Environment; import org.springframework.core.env.StandardEnvironment; import org.springframework.instrument.classloading.LoadTimeWeaver; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; /** * Autoconfiguration for the refresh scope and associated features to do with changes in * the Environment (e.g. rebinding logger levels). * * @author Dave Syer * @author Venil Noronha */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(RefreshScope.class) @ConditionalOnProperty(name = RefreshAutoConfiguration.REFRESH_SCOPE_ENABLED, matchIfMissing = true) @AutoConfigureBefore(HibernateJpaAutoConfiguration.class) public class RefreshAutoConfiguration { /** * Name of the refresh scope name. */ public static final String REFRESH_SCOPE_NAME = "refresh"; /** * Name of the prefix for refresh scope. */ public static final String REFRESH_SCOPE_PREFIX = "spring.cloud.refresh"; /** * Name of the enabled prefix for refresh scope. */ public static final String REFRESH_SCOPE_ENABLED = REFRESH_SCOPE_PREFIX + ".enabled"; @Bean @ConditionalOnMissingBean(RefreshScope.class) public static RefreshScope refreshScope() { return new RefreshScope(); } @Bean @ConditionalOnMissingBean public static LoggingRebinder loggingRebinder() { return new LoggingRebinder(); } @Bean @ConditionalOnMissingBean public ContextRefresher contextRefresher(ConfigurableApplicationContext context, RefreshScope scope) { return new ContextRefresher(context, scope); } @Bean public RefreshEventListener refreshEventListener(ContextRefresher contextRefresher) { return new RefreshEventListener(contextRefresher); } }
@ConditionalOnProperty(name = RefreshAutoConfiguration.REFRESH_SCOPE_ENABLED, matchIfMissing = true)
通过上面的注解得知,自动刷新在 spring cloud 中默认启用
nacos中添加配置
NacosConfigService
public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }
NacosConfigService 构造器创建 ClientWorker 对象
ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); } public void checkConfigInfo() { // Dispatch taskes. int listenerSize = cacheMap.size(); // Round up the longingTaskCount. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } } class LongPollingRunnable implements Runnable { private final int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); android } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }
ClientWorker checkConfigInfo() 中通过线程池创建 LongPollingRunnable 对象,线程池名称前缀为 com.alibaba.nacos.client.Worker.longPolling
CacheData
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } } private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; Runnable job = new Runnable() { @Override public void run() { ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener) listener; adapter.fillContext(dataId, group); LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5); } // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。 Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr);python String contentTmp = cr.getContent(); listener.receiveConfigInfo(contentTmp); // compare lastContent and content if (listener instanceof AbstractConfi开发者_Go教程gChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; } listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException ex) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, listener, t.getCause()); } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }
调用 CacheData 的 checkListenerMd5()
AbstractSharedListener
public abstract class AbstractSharedListener implements Listener { private volatile String dataId; private volatile String group; public final void fillContext(String dataId, String group) { this.dataId = dataId; this.group = group; } @Override public final void receiveConfigInfo(String configInfo) { innerReceive(dataId, group, configInfo); } @Override public Executor getExecutor() { return null; } /** * receive. * * @param dataId data ID * @param group group * @param configInfo content */ public abstract void innerReceive(String dataId, String group, String configInfo); }
调用 AbstractSharedListener 的 receiveConfigInfo(),在 NacosContextRefresher 的 registerNacosListener() 中进行实现
NacosContextRefresher
private void registerNacosListener(final String groupKey, final String dataKey) { String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() { @Override public void innerReceive(String dataId, String group, String configInfo) { refreshCountIncrement(); nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // todo feature: support single refresh for listening applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug(String.format( "Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo)); } } }); try { configService.addListener(dataKey, groupKey, listener); } catch (NacosException e) { log.warn(String.format( "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), e); } }
最终通过 ApplicationContext 发布事件 RefreshEvent
至此,nacos 逻辑执行完毕。
RefreshEventListener
public class RefreshEventListener implements SmartApplicationListener { private static Log log = LogFactory.getLog(RefreshEventListener.class); private ContextRefresher refresh; private AtomicBoolean ready = new AtomicBoolean(false); public RefreshEventListener(ContextRefresher refresh) { this.refresh = refresh; } @Override public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) { return ApplicationReadyEvent.class.isAssignableFrom(eventType) || RefreshEvent.class.isAssignableFrom(eventType); } @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ApplicationReadyEvent) { handle((ApplicationReadyEvent) event); } else if (event instanceof RefreshEvent) { handle((RefreshEvent) event); } } public void handle(ApplicationReadyEvent event) { this.ready.compareAndSet(false, true); } public void handle(RefreshEvent event) { if (this.ready.get()) { // don't handle events before app is ready log.debug("Event received " + event.getEventDesc()); Set<String> keys = this.refresh.refresh(); log.info("Refresh keys changed: " + keys); } } }
调用 RefreshEventListener 的 onApplicationEvent(),事件对象为 RefreshEvent。
执行完可以看到打印了日志
Event received Refresh Nacos config
后面调用 ContextRefresher 的 refresh()
ContextRefresher
public synchronized Set<String> refresh() { Set<String> keys = refreshEnvironment(); this.scope.refreshAll(); return keys; } public synchronized Set<String> refreshEnvironment() { Map<String, Object> before = extract( this.context.getEnvironment().getPropertySources()); addConfigFilesToEnvironment(); Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet(); this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys)); return keys; } /* For testing. */ ConfigurableApplicationContext addConfigFilesToEnvironment() { ConfigurableApplicationContext capture = null; try { StandardEnvironment environment = copyEnvironment( this.context.getEnvironment()); SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class) .bannerMode(Mode.OFF).web(WebApplicationType.NONE) .environment(environment); // Just the listeners that affect the environment (e.g. excluding logging // listener because it has side effects) builder.application() .setListeners(Arrays.asList(new BootstrapApplicationListener(), new ConfigFileApplicationListener())); capture = builder.run(); if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) { environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE); } MutablePropertySources target = this.context.getEnvironment() .getPropertySources(); String targetName = null; for (PropertySource<?> source : environment.getPropertySources()) { String name = source.getName(); if (target.contains(name)) { targetName = name; } if (!this.standardSources.contains(name)) { if (target.contains(name)) { target.replace(name, source); } else { if (targetName != null) { target.addAfter(targetName, source); // update targetName to preserve ordering targetName = name; } else { // targetName was null so we are at the start of the list target.addFirst(source); targetName = name; } } } } } finally { ConfigurableApplicationContext closeable = capture; while (closeable != null) { try { closeable.close(); } catch (Exception e) { // Ignore; } if (closeable.getParent() instanceof ConfigurableApplicationContext) { closeable = (ConfigurableApplicationContext) closeable.getParent(); } else { break; } } } return capture; }
refreshEnvironment() 中执行如下操作
addConfigFilesToEnvironment() 添加到配置文件到环境中,发布一系列事件 BootstrapApplicationListener、ConfigFileApplicationListener
调用 EventPublishingRunListener 的发布一系列事件进行 jvm 的重启相关操作,其中 EventPublishingRunListener 是默认的监听器,在 spring boot 的 META-INF/spring.factories 中进行了指定
# Run Listeners
org.springframework.boot.SpringApplicationRunListener=\org.springframework.boot.context.event.EventPublishingRunListener
EventPublishingRunListener
public class EventPublishingRunListener implements SpringApplicationRunListener, Ordered { private final SpringApplication application; private final String[] args; private final SimpleApplicationEventMulticaster initialMulticaster; public EventPublishingRunListener(SpringApplication application, String[] args) { this.application = application; this.args = args; this.initialMulticaster = new SimpleApplicationEventMulticaster(); for (ApplicationListener<?> listener : application.getListeners()) { this.initialMulticaster.addApplicationListener(listener); } } @Override public int getOrder() { return 0; } @Override public void starting() { this.initialMulticaster.multicastEvent(new ApplicationStartingEvent(this.application, this.args)); } @Override public void environmentPrepared(ConfigurableEnvironment environment) { this.initialMulticaster .multicastEvent(new ApplicationEnvironmentPreparedEvent(this.application, this.args, environment)); } @Override public void contextPrepared(ConfigurableApplicationContext context) { this.initialMulticaster .multicastEvent(new ApplicationContextInitializedEvent(this.application, this.args, context)); } @Override public void contextLoaded(ConfigurableApplicationContext context) { for (ApplicationListener<?> listener : this.application.getListeners()) { if (listener instanceof ApplicationContextAware) { ((ApplicationContextAware) listener).setApplicationContext(context); } context.addApplicationListener(listener); } this.initialMulticaster.multicastEvent(new ApplicationPreparedEvent(this.application, this.args, context)); } @Override public void started(ConfigurableApplicationContext context) { context.publishEvent(new ApplicationStartedEvent(this.application, this.args, context)); } @Override public void running(ConfigurableApplicationContext context) { context.publishEvent(new ApplicationReadyEvent(this.application, this.args, context)); } @Override public void failed(ConfigurableApplicationContext context, Throwable exception) { ApplicationFailedEvent event = new ApplicationFailedEvent(this.application, this.args, context, exception); if (context != null && context.isActive()) { // Listeners have been registered to the application context so we should // use it at this point if we can context.publishEvent(event); } else { // An inactive context may not have a multicaster so we use our multicaster to // call all of the context's listeners instead if (context instanceof AbstractApplicationContext) { for (ApplicationListener<?> listener : ((AbstractApplicationContext) context) .getApplicationListeners()) { this.initialMulticaster.addApplicationListener(listener); } } this.initialMulticaster.setErrorHandler(new LoggingErrorHandler()); this.initialMulticaster.multicastEvent(event); } } private static class LoggingErrorHandler implements ErrorHandler { private static final Log logger = LogFactory.getLog(EventPublishingRunListener.class); @Override public void handleError(Throwable throwable) { logger.warn("Error calling ApplicationEventListener", throwable); } } }
调用了 contextLoaded(),在 RestartListener 的 onApplicationEvent() 中进行调用
RestartListener
public class RestartListener implements SmartApplicationListener { private ConfigurableApplicationContext context; private ApplicationPreparedEvent event; @Override public int getOrder() { return 0; } @Override public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) { return ApplicationPreparedEvent.class.isAssignableFrom(eventType) || ContextRefreshedEvent.class.isAssignableFrom(eventType) || ContextClosedEvent.class.isAssignableFrom(eventType); } @Override public boolean supportsSourceType(Class<?> sourceType) { return true; } @Override public void onApplicationEvent(ApplicationEvent input) { if (input instanceof ApplicationPreparedEvent) { this.event = (ApplicationPreparedEvent) input; if (this.context == null) { this.context = this.event.getApplicationContext(); } } else if (input instanceof ContextRefreshedEvent) { if (this.context != null && input.getSource().equals(this.context) && this.event != null) { this.context.publishEvent(this.event); } } else { if (this.context != null && input.getSource().equals(this.context)) { this.context = null; this.event = null; } } } }
RestartListener 的 onApplicationEvent() 传入 ApplicationPreparedEvent,调用 AbstractApplicationContext 的 refresh(),即进行 ioc 容器重启,此时是第一次
调用 EventPublishingRunListener 的 running(),进行新的配置加载
调用 PropertySourceBootstrapConfiguration 的 initialize(),间接调用 NacosPropertySourceLocator 的 locate() 进行文件加载
调用 RestartListener 的 onApplicationEvent(),参数为 ApplicationPreparedEvent,调用 AbstractApplicationContext 的 refresh() 进行 ioc 容器重启,此时是第二次
调用 RestartListener 的 onApplicationEvent(),参数为 ContextRefreshedEvent
至此,ContextRefresher 的 refreshEnvironment(编程) 逻辑执行完毕
接下来调用 RefreshScope 的 refreshAll(),间接调用父类 GenericScope 的 destroy(),发布事件 RefreshScopeRefreshedEvent 到 ApplicationContext 中
Java连接nacos后会定时心跳连接
通过 NacosWatch 开启 ThreadPoolTaskScheduler 进行定时任务发起,事件为 HeartbeatEvent。
总结
nacos 的在页面上的配置信息的更新是通过 jvm 重启实现的。想到了 jvm 启动后,无法做热更新,这么做也是不错的选择。由于做了重启,这个适合在没有访问的情况下执行,如果在操作过程中有事务在执行会不好,但是在生产环境中是否有这样的应用目前还不清楚。
由此可以看到,spring 中大量使用了事件、还有观察者模式、消息队列、消息通知、web 请求响应、窗口点击事件等。
参考链接
到此这篇关于SpringCloud @RefreshScope刷新机制深入探究的文章就介绍到这了,更多相关SpringCloud @RefreshScope内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!
精彩评论