【深入理解SpringCloud微服务】深入理解nacos配置中心(四)——配置新增或修改源码分析
- 原理回顾
- 源码分析
- ConfigController#publishConfig()
- ConfigOperationService#publishConfig()
- nacos事件监听机制
- ConfigChangePublisher#notifyConfigChange()
- NotifyCenter#publishEvent(Event)
- DefaultPublisher#publish(Event )
- DefaultPublisher线程启动
- AsyncNotifyService
- AsyncRpcTask
- DumpService#dump()
原理回顾
在之前的《宏观理解nacos配置中心原理》这一篇文章中我们已经对nacos的配置新增或修改的流程进行描述。
当我们在nacos-console控制界面上新增或修改了配置并发布后,就会发送http请求,然后nacos服务端接收到http请求后,会进入ConfigController的publishConfig()方法进行处理。
ConfigController的publishConfig()方法先把新增或修改后的配置持久化到MySQL。
配置持久化到MySQL之后,异步进行dump配置内容到磁盘文件以及通知nacos集群中的其他节点发生配置变更。
dump配置内容到磁盘文件是通过DumpService进行的,DumpService先把新增或修改的配置dump到磁盘文件,然后根据文件内容算出一个MD5值,再拿到算出的MD5值与缓存中该配置文件对应的MD5值进行比较,如果两MD5值不一致,说明修改后的配置内容与修改前不一致,也就是发生了配置变更,需要更新MD5值并通知客户端。
源码分析
ConfigController#publishConfig()
@PostMapping...public Boolean publishConfig(...) throws NacosException {...return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);}
ConfigController的publishConfig方法调用configOperationService的publishConfig方法。
ConfigOperationService#publishConfig()
public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)throws NacosException {...// 持久化到MySQLpersistService.insertOrUpdate(configRequestInfo.getSrcIp(), configForm.getSrcUser(), configInfo, time,configAdvanceInfo, false);// 发布ConfigDataChangeEvent事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),configForm.getNamespaceId(), time.getTime()));...}
ConfigOperationService的publishConfig方法首先把配置持久化到MySQL,然后发布一个ConfigDataChangeEvent事件,这个ConfigDataChangeEvent事件会异步触发配置文件dump以及通知nacos集群其他节点发生配置变更等操作。
nacos事件监听机制
ConfigChangePublisher#notifyConfigChange()
public static void notifyConfigChange(ConfigDataChangeEvent event) {...NotifyCenter.publishEvent(event);}
ConfigChangePublisher的notifyConfigChange方法调用NotifyCenter的publishEvent方法发布事件。
NotifyCenter#publishEvent(Event)
public static boolean publishEvent(final Event event) {...return publishEvent(event.getClass(), event);...}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {...// 根据事件类型获取对应的事件发布器EventPublisher,// 获取到的是DefaultPublisherfinal String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {// 调用EventPublisher的publish方法return publisher.publish(event);}...}
NotifyCenter的publishEvent方法根据事件类型获取对应的事件发布器EventPublisher,获取到的是DefaultPublisher,然后调用DefaultPublisher的publish(event)方法发布事件。
DefaultPublisher#publish(Event )
public boolean publish(Event event) {...boolean success = this.queue.offer(event);...}
DefaultPublisher的publish方法调用this.queue.offer(event)把事件对象放入队列中。
由于DefaultPublisher继承了Thread,所以实际上DefaultPublisher是一个线程对象,调用start()方法可以启动一个线程。
DefaultPublisher线程启动
public class DefaultPublisher extends Thread implements EventPublisher {...
}
@Overridepublic void init(Class<? extends Event> type, int bufferSize) {...// 创建队列this.queue = new ArrayBlockingQueue<>(bufferSize);// 调start()方法启动线程start();}public ConcurrentHashSet<Subscriber> getSubscribers() {return subscribers;}@Overridepublic synchronized void start() {if (!initialized) {...// 调用Thread的start()方法启动线程super.start();...initialized = true;}}
DefaultPublisher的init方法就会调用start()方法启动线程,在调用start()方法前还创建了队列queue。
而DefaultPublisher的init方法在NotifyCenter的static代码块中被调用的。
public class NotifyCenter {...static {...// 通过SPI机制加载EventPublisherfinal Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {clazz = DefaultPublisher.class;}DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {// 反射实例化EventPublisher并调用EventPublisher的init方法初始化EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (...) {...}};...}...}
调用start()方法,线程运行起来后,就会执行run()方法。
@Overridepublic void run() {openEventHandler();}void openEventHandler() {try {...// for循环从队列中取出事件,执行receiveEvent方法for (; ; ) {...final Event event = queue.take();receiveEvent(event);...}} catch (...) {...}}
run方法调用openEventHandler方法,openEventHandler方法中for循环里不停地从队列中取出事件,并调用receiveEvent方法处理。
void receiveEvent(Event event) {...for (Subscriber subscriber : subscribers) {if (!subscriber.scopeMatches(event)) {continue;}...notifySubscriber(subscriber, event);}}
receiveEvent方法中for循环遍历所有的订阅者Subscriber(也就是观察者),然后调用Subscriber的scopeMatches(Event)方法看是否与该事件匹配,如果不匹配,则跳过,如果匹配,则调用notifySubscriber方法做下一步处理。
@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {...// 创建一个Runnable,run方法调用Subscriber的onEvent方法final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();// 如果线程池不为空则提交到线程池执行,如果为空则在当前线程执行if (executor != null) {executor.execute(job);} else {try {job.run();} catch (...) {...}}}
notifySubscriber方法创建一个Runnable,run方法调用Subscriber的onEvent方法,然后从subscriber中取出线程池,如果线程池不为空则提交到线程池执行,如果为空则在当前线程执行。但无论线程池是否为空,执行的都是Subscriber的onEvent方法。
AsyncNotifyService
由于前面发布的是一个ConfigDataChangeEvent事件,这里匹配到的是AsyncNotifyService内部的Subscriber,然后执行到这个Subscriber的onEvent方法。
public AsyncNotifyService(ServerMemberManager memberManager) {...NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (event instanceof ConfigDataChangeEvent) {...// 拿到nacos集群中的所有节点成员Collection<Member> ipList = memberManager.allMembers();...// 创建一个任务队列Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();for (Member member : ipList) {...// 每个节点成员创建一个通知任务放入队列rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));...}...if (!rpcQueue.isEmpty()) {// 队列放入AsyncRpcTask,AsyncRpcTask提交到线程池执行ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));}}}...});}
在AsyncNotifyService的构造方法中,把这个Subscriber注册到NotifyCenter。这个Subscriber的onEvent方法首先拿到nacos集群中的所有节点成员ipList,给每个成员创建一个通知任务放入队列rpcQueue,然后将这个队列放入到一个AsyncRpcTask对象,再将这个AsyncRpcTask提交到线程池执行。
AsyncRpcTask
然后线程池执行该任务,就会执行AsyncRpcTask的run()方法。
public void run() {while (!queue.isEmpty()) {// 从队列取出任务NotifySingleRpcTask task = queue.poll();...Member member = task.member;// 是当前节点?if (memberManager.getSelf().equals(member)) {...// 如果是当前节点,执行DumpService的dump方法,进行dump磁盘,更新md5值,通知客户端等操作dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());...continue;}...// 不是当前节点,那就是集群中的其他nacos节点,通知其发生配置变更configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));...}}}...}}
DumpService#dump()
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,boolean isBeta) {...// 将任务放入dumpTaskMgr中异步执行dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));...}
DumpService的dump方法将任务放入dumpTaskMgr中异步执行,然后执行这个任务的是DumpProcessor,异步任务会在DumpProcessor的process方法中执行。
public boolean process(NacosTask task) {...return DumpConfigHandler.configDump(build.build());}
DumpProcessor的process方法调用DumpConfigHandler的configDump静态方法。
public static boolean configDump(ConfigDumpEvent event) {...// 调用ConfigCacheService的dump方法,进行dump配置到磁盘文件,更新md5值,通知客户端发生配置变更result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);
...return result;}}
DumpConfigHandler的configDump静态方法会调用ConfigCacheService的dump方法,dump方法会进行dump配置到磁盘文件,更新md5值,通知客户端发生配置变更等一系列操作。
ConfigCacheService的dump方法在服务端启动的时候也会调用,这个我们在上一篇文章《服务端启动与获取配置源码分析》已经分析过,这里就不再分析了。