【深入理解SpringCloud微服务】深入理解nacos配置中心(四)——配置新增或修改源码分析

【深入理解SpringCloud微服务】深入理解nacos配置中心(四)——配置新增或修改源码分析

  • 原理回顾
  • 源码分析
    • ConfigController#publishConfig()
    • ConfigOperationService#publishConfig()
    • nacos事件监听机制
      • ConfigChangePublisher#notifyConfigChange()
      • NotifyCenter#publishEvent(Event)
      • DefaultPublisher#publish(Event )
      • DefaultPublisher线程启动
    • AsyncNotifyService
    • AsyncRpcTask
    • DumpService#dump()

原理回顾

在之前的《宏观理解nacos配置中心原理》这一篇文章中我们已经对nacos的配置新增或修改的流程进行描述。

在这里插入图片描述

当我们在nacos-console控制界面上新增或修改了配置并发布后,就会发送http请求,然后nacos服务端接收到http请求后,会进入ConfigController的publishConfig()方法进行处理。

ConfigController的publishConfig()方法先把新增或修改后的配置持久化到MySQL。

配置持久化到MySQL之后,异步进行dump配置内容到磁盘文件以及通知nacos集群中的其他节点发生配置变更。

dump配置内容到磁盘文件是通过DumpService进行的,DumpService先把新增或修改的配置dump到磁盘文件,然后根据文件内容算出一个MD5值,再拿到算出的MD5值与缓存中该配置文件对应的MD5值进行比较,如果两MD5值不一致,说明修改后的配置内容与修改前不一致,也就是发生了配置变更,需要更新MD5值并通知客户端。

源码分析

ConfigController#publishConfig()

    @PostMapping...public Boolean publishConfig(...) throws NacosException {...return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);}

ConfigController的publishConfig方法调用configOperationService的publishConfig方法。

在这里插入图片描述

ConfigOperationService#publishConfig()

    public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)throws NacosException {...// 持久化到MySQLpersistService.insertOrUpdate(configRequestInfo.getSrcIp(), configForm.getSrcUser(), configInfo, time,configAdvanceInfo, false);// 发布ConfigDataChangeEvent事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),configForm.getNamespaceId(), time.getTime()));...}

ConfigOperationService的publishConfig方法首先把配置持久化到MySQL,然后发布一个ConfigDataChangeEvent事件,这个ConfigDataChangeEvent事件会异步触发配置文件dump以及通知nacos集群其他节点发生配置变更等操作。

在这里插入图片描述

nacos事件监听机制

ConfigChangePublisher#notifyConfigChange()

    public static void notifyConfigChange(ConfigDataChangeEvent event) {...NotifyCenter.publishEvent(event);}

ConfigChangePublisher的notifyConfigChange方法调用NotifyCenter的publishEvent方法发布事件。

在这里插入图片描述

NotifyCenter#publishEvent(Event)

    public static boolean publishEvent(final Event event) {...return publishEvent(event.getClass(), event);...}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {...// 根据事件类型获取对应的事件发布器EventPublisher,// 获取到的是DefaultPublisherfinal String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {// 调用EventPublisher的publish方法return publisher.publish(event);}...}

NotifyCenter的publishEvent方法根据事件类型获取对应的事件发布器EventPublisher,获取到的是DefaultPublisher,然后调用DefaultPublisher的publish(event)方法发布事件。

在这里插入图片描述

DefaultPublisher#publish(Event )

    public boolean publish(Event event) {...boolean success = this.queue.offer(event);...}

DefaultPublisher的publish方法调用this.queue.offer(event)把事件对象放入队列中。

在这里插入图片描述

由于DefaultPublisher继承了Thread,所以实际上DefaultPublisher是一个线程对象,调用start()方法可以启动一个线程。

DefaultPublisher线程启动

public class DefaultPublisher extends Thread implements EventPublisher {...
}
    @Overridepublic void init(Class<? extends Event> type, int bufferSize) {...// 创建队列this.queue = new ArrayBlockingQueue<>(bufferSize);// 调start()方法启动线程start();}public ConcurrentHashSet<Subscriber> getSubscribers() {return subscribers;}@Overridepublic synchronized void start() {if (!initialized) {...// 调用Thread的start()方法启动线程super.start();...initialized = true;}}

DefaultPublisher的init方法就会调用start()方法启动线程,在调用start()方法前还创建了队列queue。

而DefaultPublisher的init方法在NotifyCenter的static代码块中被调用的。

public class NotifyCenter {...static {...// 通过SPI机制加载EventPublisherfinal Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {clazz = DefaultPublisher.class;}DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {// 反射实例化EventPublisher并调用EventPublisher的init方法初始化EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (...) {...}};...}...}

在这里插入图片描述

调用start()方法,线程运行起来后,就会执行run()方法。

    @Overridepublic void run() {openEventHandler();}void openEventHandler() {try {...// for循环从队列中取出事件,执行receiveEvent方法for (; ; ) {...final Event event = queue.take();receiveEvent(event);...}} catch (...) {...}}

run方法调用openEventHandler方法,openEventHandler方法中for循环里不停地从队列中取出事件,并调用receiveEvent方法处理。

在这里插入图片描述

    void receiveEvent(Event event) {...for (Subscriber subscriber : subscribers) {if (!subscriber.scopeMatches(event)) {continue;}...notifySubscriber(subscriber, event);}}

receiveEvent方法中for循环遍历所有的订阅者Subscriber(也就是观察者),然后调用Subscriber的scopeMatches(Event)方法看是否与该事件匹配,如果不匹配,则跳过,如果匹配,则调用notifySubscriber方法做下一步处理。

在这里插入图片描述

    @Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {...// 创建一个Runnable,run方法调用Subscriber的onEvent方法final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();// 如果线程池不为空则提交到线程池执行,如果为空则在当前线程执行if (executor != null) {executor.execute(job);} else {try {job.run();} catch (...) {...}}}

notifySubscriber方法创建一个Runnable,run方法调用Subscriber的onEvent方法,然后从subscriber中取出线程池,如果线程池不为空则提交到线程池执行,如果为空则在当前线程执行。但无论线程池是否为空,执行的都是Subscriber的onEvent方法。

在这里插入图片描述

AsyncNotifyService

由于前面发布的是一个ConfigDataChangeEvent事件,这里匹配到的是AsyncNotifyService内部的Subscriber,然后执行到这个Subscriber的onEvent方法。

    public AsyncNotifyService(ServerMemberManager memberManager) {...NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (event instanceof ConfigDataChangeEvent) {...// 拿到nacos集群中的所有节点成员Collection<Member> ipList = memberManager.allMembers();...// 创建一个任务队列Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();for (Member member : ipList) {...// 每个节点成员创建一个通知任务放入队列rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));...}...if (!rpcQueue.isEmpty()) {// 队列放入AsyncRpcTask,AsyncRpcTask提交到线程池执行ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));}}}...});}

在AsyncNotifyService的构造方法中,把这个Subscriber注册到NotifyCenter。这个Subscriber的onEvent方法首先拿到nacos集群中的所有节点成员ipList,给每个成员创建一个通知任务放入队列rpcQueue,然后将这个队列放入到一个AsyncRpcTask对象,再将这个AsyncRpcTask提交到线程池执行。

在这里插入图片描述

AsyncRpcTask

然后线程池执行该任务,就会执行AsyncRpcTask的run()方法。

        public void run() {while (!queue.isEmpty()) {// 从队列取出任务NotifySingleRpcTask task = queue.poll();...Member member = task.member;// 是当前节点?if (memberManager.getSelf().equals(member)) {...// 如果是当前节点,执行DumpService的dump方法,进行dump磁盘,更新md5值,通知客户端等操作dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());...continue;}...// 不是当前节点,那就是集群中的其他nacos节点,通知其发生配置变更configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));...}}}...}}

在这里插入图片描述

DumpService#dump()

    public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,boolean isBeta) {...// 将任务放入dumpTaskMgr中异步执行dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));...}

DumpService的dump方法将任务放入dumpTaskMgr中异步执行,然后执行这个任务的是DumpProcessor,异步任务会在DumpProcessor的process方法中执行。

    public boolean process(NacosTask task) {...return DumpConfigHandler.configDump(build.build());}

DumpProcessor的process方法调用DumpConfigHandler的configDump静态方法。

    public static boolean configDump(ConfigDumpEvent event) {...// 调用ConfigCacheService的dump方法,进行dump配置到磁盘文件,更新md5值,通知客户端发生配置变更result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);
...return result;}}

DumpConfigHandler的configDump静态方法会调用ConfigCacheService的dump方法,dump方法会进行dump配置到磁盘文件,更新md5值,通知客户端发生配置变更等一系列操作。

在这里插入图片描述

ConfigCacheService的dump方法在服务端启动的时候也会调用,这个我们在上一篇文章《服务端启动与获取配置源码分析》已经分析过,这里就不再分析了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/140744.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

在 FlexSim 中使用 OpenUSD 分析、可视化和优化现实世界的流程

对于制造和工业企业而言&#xff0c;效率和精度至关重要。为了简化运营、降低成本和提高生产力&#xff0c;各公司正在转向数字孪生和离散事件模拟。 离散事件模拟使制造商能够通过试验不同的输入和行为来优化流程&#xff0c;这些输入和行为可以逐步进行建模和测试。 FlexSi…

6. Transforms的使用(一)--ToTensor()

Transforms的使用&#xff08;一&#xff09; 1.使用ToTensor类将数据转化为Tensor形式 导入需要使用的transforms类 from torchvision import transforms创建ToTensor类的实例 totensor transforms.ToTensor()将读取的图片ndarray数据转化为Tensor数据 img cv.imread(img_p…

Java网络编程 TCP通信(Socket 与 ServerSocket)

1.TCP通信原理 TCP通信涉及两个端点&#xff1a;客户端和服务器。服务器端使用 ServerSocket 监听特定端口&#xff0c;等待客户端的连接请求。客户端使用 Socket 连接到服务器的IP地址和端口。一旦连接建立&#xff0c;双方就可以通过输入输出流进行数据交换. ServerSocket是…

视频工具EasyDarwin生成RTMP给WVP拉流列表

效果 运行 登录 http://127.0.0.1:10086/ admin/admin 创建RTMP服务

微型导轨在光学仪器中的应用!

微型导轨在光学仪器中扮演着至关重要的角色&#xff0c;以其高精度、高稳定性的特点&#xff0c;提供稳定的光学路径和精确的光学元件位置。接下来&#xff0c;我们一起来看看微型导轨在光学仪器中的应用实例&#xff01; 1、显微镜&#xff1a;在显微镜中&#xff0c;微型导轨…

鹏哥C语言自定义笔记重点(67-)

67. 68. 69. 70. 71.结构体内容 72.理解结构体的字节数 73. #pragma once //头文件中使用&#xff0c;功能是:防止头文件被多次引用 74.结构体传参 结论:结构体传参时&#xff0c;要传结构体地址。 75.位段 76.static是只能在该文件中看到&#xff0c;其他地方看不到 77.…

【6大设计原则】迪米特法则:解密软件设计中的“最少知识原则”

引言 在软件设计中&#xff0c;设计原则是指导我们构建高质量、可维护系统的基石。迪米特法则&#xff08;Law of Demeter&#xff0c;LoD&#xff09;&#xff0c;也被称为“最少知识原则”&#xff0c;是六大设计原则之一。它强调对象之间的松耦合&#xff0c;确保系统的各个…

8. Transforms的使用(三)-- Resize

Transforms的使用(三) 1. 为什么要使用Resize 在模型的训练过程中往往需要图片数据的维度相同,才能适应深度学习模型中的相关神经网络结构,这时候就需要使用Resize保证所有的图片保持相同的尺寸2. 使用Resize调整图片的尺寸 在pytorch2.3的版本上,Resize()支持对Tensor类…

1405 问题 E: 世界杯

废话 这个题&#xff0c;我估计 22 22 22 年的时候写过一次&#xff0c;当时应该是搞明白了&#xff0c;现在重新写还是不会写&#xff0c;有点无奈 题目 问题 E: 世界杯&#xff1a;现在的 OJ 把题目加到一个活动里面去之后&#xff0c;感觉之后这个链接就访问不了了。题目…

CSS—4

1.定位 1.相对定位 2.绝对定位 3.固定定位 4.粘性定位 5.定位的特殊应用 2.布局-版心 3.布局-常用布局名词 4.布局-重置默认样式

321. 拼接最大数

1. 题目 321. 拼接最大数 2. 解题思路 题目精简一下&#xff1a; 给你两个数组&#xff0c;从每个数组选取N个元素&#xff08;需要保持相对顺序&#xff0c;比如从数组[4,8,2]选取两个元素&#xff0c;选取出来后必须保持顺序&#xff0c;比如选4和2&#xff0c;那么组成新…

对操作系统(OS)管理和进程的理解

文章目录 从冯诺依曼体系入手来了解计算机硬件部分操作系统操作系统的概念设计操作系统&#xff08;OS&#xff09;的目的对下&#xff08;硬件&#xff09;OS的管理对上如何理解系统调用 进程 在计算机系统中&#xff0c;硬件、操作系统和进程是三个至关重要的概念。它们相互协…

C# 反射之动态生成dll/exe

这个可能应该属于反射的高级使用范围了&#xff0c;平常在项目中使用的人估计也不是很多。由于使用反射的话会降低性能&#xff0c;比如之前用到的GetValue、SetValue等之类&#xff0c;但是使用这种方式会大大提高效率&#xff0c;在这里我只想说&#xff0c;都直接写IL指令了…

C++八股文之面向对象篇

&#x1f916;个人主页&#xff1a;晚风相伴-CSDN博客 思维导图链接&#xff1a;面向对象的性质 持续更新中…… &#x1f496;如果觉得内容对你有帮助的话&#xff0c;还请给博主一键三连&#xff08;点赞&#x1f49c;、收藏&#x1f9e1;、关注&#x1f49a;&#xff09;吧 …

【CSS in Depth 2 精译_031】5.3 Grid 网格布局的两种替代语法

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一章 层叠、优先级与继承&#xff08;已完结&#xff09; 1.1 层叠1.2 继承1.3 特殊值1.4 简写属性1.5 CSS 渐进式增强技术1.6 本章小结 第二章 相对单位&#xff08;已完结&#xff09; 2.1 相对…

【VSCode】VSCode Background 背景插件辅助窗口程序

前排贴上Github项目链接 GitHub窗口项目链接 这是一个基于VSCode上由shalldie上传的background扩展制作的windows窗口程序。 该程序旨在通过窗口程序尽可能的完善该扩展原有的功能。 background - shalldie 的最大优势是我目前仅在其扩展上发现了UseFront的选项&#xff0c;这…

2011年全国硕士研究生入学统一考试计算机科学与技术

1. 试卷背景&#xff1a; 试题&#xff1a;2011年全国硕士研究生入学统一考试计算机科学与技术学科联考中的计算机学科专业基础综合试题。难点&#xff1a;该问题的研究难点在于试题涵盖了计算机科学与技术的多个方面&#xff0c;包括数据结构、算法、计算机组成原理、操作系统…

text2sql(NL2Sql)综述《The Dawn of Natural Language to SQL: Are We Fully Ready?》

《The Dawn of Natural Language to SQL: Are We Fully Ready?》(github)出自2024年6月的NL2SQL(Natural language to SQL )综述论文。这篇论文尝试回答如下三个问题&#xff1a; 问题1:NL2SQL的现状是什么&#xff1f;(Q1:Where Are we Now?) 论文图1总结了近20年NL2SQL方法…

Qt:懒汉单例(附带单例使用和内存管理)

前言 本文主要写懒汉单例以及单例的释放&#xff0c;网上很多教程只有单例的创建&#xff0c;但是并没有告诉我们单例的内存管理&#xff0c;这就很头疼。 正文 以下是两种懒汉单例的写法 1. 懒汉式单例&#xff08;多线程不安全&#xff0c;但是在单线程里面是安全的&…

protobuf中c、c++、python使用

文章目录 protobuf实例&#xff1a;例题1&#xff1a;[CISCN 2023 初赛]StrangeTalkBot分析&#xff1a;思路&#xff1a;利用&#xff1a; 例题2&#xff1a;[CISCN 2024]protoverflow分析&#xff1a; protobuf Protocol Buffers&#xff0c;是Google公司开发的一种数据描述语…