Nacos源码—1.Nacos服务注册发现分析二
大纲
1.客户端如何发起服务注册 + 发送服务心跳
2.服务端如何处理客户端的服务注册请求
3.注册服务—如何实现高并发支撑上百万服务注册
4.内存注册表—如何处理注册表的高并发读写冲突
2.服务端如何处理客户端的服务注册请求
(1)客户端自动发送服务注册请求梳理
(2)Nacos服务端处理服务请求的代码入口
(3)Nacos服务端处理服务注册请求的源码分析
(4)服务端接收到服务实例注册请求后的处理总结
(1)客户端自动发送服务注册请求梳理
首先,从spring-cloud-starter-alibaba-nacos-discovery中,发现在spring.factories文件定义了很多Configuration配置类,其中就包括了NacosServiceRegistryAutoConfiguration配置类。这个配置类会创建三个Bean对象,其中有个Bean对象便实现了一个监听事件方法。
然后,Spring容器启动时,会发布一个事件。这个事件会被名为NacosAutoServiceRegistration的Bean对象监听到,从而自动发起Nacos服务注册。在注册时会开启心跳健康延时任务,每隔5s执行一次。不管是服务注册还是心跳检查,都是通过HTTP方式调用Nacos服务端。
客户端向服务端发起服务注册请求是通过HTTP接口"/nacos/v1/ns/instance"来实现的,客户端向服务端发起心跳请求是通过HTTP接口"/nacos/v1/ns/instance/beat"来实现的。
(2)Nacos服务端处理服务注册请求的代码入口
Nacos服务端有一个叫nacos-naming的模块,这个nacos-naming模块其实就是一个Spring Boot项目,模块中的controllers包则是用来处理服务相关的HTTP请求。
由于服务端处理服务注册请求的地址是"/nacos/v1/ns/instance",所以对服务实例进行处理的入口是controllers包下的InstanceController。InstanceController的代码很好地遵守了Restful风格,其中的regsiter()方法注册新服务实例对应@PostMapping、deregister()方法注销服务实例对应@DeleteMapping、update()方法修改服务实例对应@PutMapping。虽然都可以使用@PostMapping,但Nacos就严格按照了Restful标准。
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {...//Register new instance.@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {...}//Deregister instances.@CanDistro@DeleteMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String deregister(HttpServletRequest request) throws Exception {...}//Update instance.@CanDistro@PutMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String update(HttpServletRequest request) throws Exception {...}...
}public class UtilsAndCommons {// ********************** Nacos HTTP Context ************************ \\public static final String NACOS_SERVER_CONTEXT = "/nacos";public static final String NACOS_SERVER_VERSION = "/v1";public static final String DEFAULT_NACOS_NAMING_CONTEXT = NACOS_SERVER_VERSION + "/ns";public static final String NACOS_NAMING_CONTEXT = DEFAULT_NACOS_NAMING_CONTEXT;...
}
(3)Nacos服务端处理服务注册请求的源码分析
对于Nacos客户端的服务实例注册请求,会由InstanceController的register()方法进行处理。该方法首先会从请求参数中获取Instance服务实例,然后调用ServiceManager的registerInstance()方法来进行服务实例注册。ServiceManager是Nacos的服务管理者,拥有所有的服务列表,可以通过它来管理所有服务的注册、销毁、修改等。
在ServiceManager的registerInstance()方法中:首先会通过调用ServiceManager的createEmptyService()方法创建一个空服务,然后通过ServiceManager的addInstance()方法添加注册请求中的服务实例。
在ServiceManager的addInstance()方法中:首先构建出要注册的服务实例对应的服务的key,然后使用synchronized锁住要注册的服务实例对应的服务,接着获取要注册的服务实例对应的服务的最新服务实例列表,最后执行DelegateConsistencyServiceImpl的put()方法更新服务实例列表。
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {@Autowiredprivate ServiceManager serviceManager;...//Register new instance.@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {//从request中获取命名空间、服务名称final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);//从request中获取Instance服务实例final Instance instance = parseInstance(request);//调用ServiceManager的注册实例方法serviceManager.registerInstance(namespaceId, serviceName, instance);return "ok";}...
}//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;private final Object putServiceLock = new Object();...//Register an instance to a service in AP mode.//This method creates service or cluster silently if they don't exist.public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1.创建一个空的服务createEmptyService(namespaceId, serviceName, instance.isEphemeral());//2.根据命名空间ID、服务名获取一个服务,如果获取结果为null则抛异常Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);}//3.添加服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}...//1.创建一个空服务public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);}//Create service if not exist.public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));//now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}}private void putServiceAndInit(Service service) throws NacosException {//把Service放入注册表serviceMap中putService(service);service.init();//把Service作为监听器添加到consistencyService的listeners中consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}//Put service into manager.public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());}}}serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}public void addOrReplaceService(Service service) throws NacosException {consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);}...//2.根据命名空间ID、服务名获取一个服务public Service getService(String namespaceId, String serviceName) {if (serviceMap.get(namespaceId) == null) {return null;}return chooseServiceMap(namespaceId).get(serviceName);}public Map<String, Service> chooseServiceMap(String namespaceId) {return serviceMap.get(namespaceId);}...//3.添加服务实例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//构建要注册的服务实例对应的服务的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根据命名空间以及服务名获取要注册的服务实例对应的服务Service service = getService(namespaceId, serviceName);//使用synchronized锁住要注册的服务实例对应的服务synchronized (service) {//由于一个服务可能存在多个服务实例,所以需要根据当前注册请求的服务实例ips,获取对应服务的最新服务实例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances实现了用于在Nacos集群进行网络传输的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//执行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {//更新对应服务的服务实例列表return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}//Compare and get new instance list.public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {//先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {//把instance实例的IP当作key,instance实例当作value,放入currentInstancescurrentInstances.put(instance.toIpAddr(), instance);//把实例唯一编码添加到currentInstanceIds中currentInstanceIds.add(instance.getInstanceId());}//用来存放当前要注册的服务实例对应的服务的、所有服务实例Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}//instanceMap的key与IP和端口有关instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}//最后instanceMap里肯定会包含新注册的Instance实例//并且如果不是第一次注册,里面还会包含之前注册的Instance实例信息return new ArrayList<>(instanceMap.values());}...
}//Package of instance list.
public class Instances implements Record {private List<Instance> instanceList = new ArrayList<>();...
} public class KeyBuilder {public static final String INSTANCE_LIST_KEY_PREFIX = "com.alibaba.nacos.naming.iplist.";private static final String EPHEMERAL_KEY_PREFIX = "ephemeral.";public static final String NAMESPACE_KEY_CONNECTOR = "##";...public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName) : buildPersistentInstanceListKey(namespaceId, serviceName);}//返回的key形如:"com.alibaba.nacos.naming.iplist.ephemeral." + namespaceId + " + "##" + serviceNameprivate static String buildEphemeralInstanceListKey(String namespaceId, String serviceName) {return INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;}public static boolean matchEphemeralKey(String key) {//currently only instance list has ephemeral type:return matchEphemeralInstanceListKey(key);}public static boolean matchEphemeralInstanceListKey(String key) {//判定key是否是以这样的字符串开头:"com.alibaba.nacos.naming.iplist.ephemeral."return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);}...
}
DelegateConsistencyServiceImpl的put()方法更新服务实例列表存储时:首先会根据表示服务的key来选择不同的ConsistencyService。如果是临时服务实例,则调用DistroConsistencyServiceImpl的put()方法。如果是持久化服务实例,则调用PersistentConsistencyServiceDelegateImpl的put()方法。
在DistroConsistencyServiceImpl的put()方法中:首先会调用DistroConsistencyServiceImpl的onPut()方法,把包含当前注册的服务实例的、最新服务实例列表存储到DataStore中,然后调用DistroProtocol的sync()方法进行集群节点间的服务实例数据同步,其中DataStore用于存储所有已注册的服务实例数据。
而在DistroConsistencyServiceImpl的onPut()方法中:会先创建Datum对象,注入服务key和服务的所有服务实例Instances,然后才将Datum对象添加到DataStore的Map对象里。最后调用Notifier的addTask()方法添加一个数据变更的任务,也就是把key、action封装成Pair对象,放入一个Notifier的阻塞队列中。
注意:在DistroConsistencyServiceImpl初始化完成后,会提交一个进行无限for循环的任务给一个单线程的线程池来执行。无限for循环中会不断从阻塞队列中获取Pair对象进行处理。而在进行服务实例注册时,会往该任务的阻塞队列添加Pair对象。
//Consistency delegate.
@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;private final EphemeralConsistencyService ephemeralConsistencyService;...@Overridepublic void put(String key, Record value) throws NacosException {//如果是临时实例,则调用DistroConsistencyServiceImpl.put()方法//如果是持久化实例,则调用PersistentConsistencyServiceDelegateImpl.put()方法mapConsistencyService(key).put(key, value);}private ConsistencyService mapConsistencyService(String key) {//根据不同的key选择不同的ConsistencyServicereturn KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}...
}@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final GlobalConfig globalConfig;private final DistroProtocol distroProtocol;private final DataStore dataStore;//用于存储所有已注册的服务实例数据private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();private volatile Notifier notifier = new Notifier();...@PostConstructpublic void init() {//初始化完成后,会将notifier任务提交给GlobalExecutor来执行GlobalExecutor.submitDistroNotifyTask(notifier);}@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);} if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);//Add new notify task to queue.public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}//tasks是一个阻塞队列,把key、action封装成Pair对象,放入队列中tasks.offer(Pair.with(datumKey, action));}public int getTaskSize() {return tasks.size();}@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");for (; ; ) {try {Pair<String, DataOperation> pair = tasks.take();handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}
}//用于存储所有已注册的服务实例数据
@Component
public class DataStore {private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);public void put(String key, Datum value) {dataMap.put(key, value);}...
}
(4)服务端接收到服务实例注册请求后的处理总结
register()注册方法会先从Request对象中获取从客户端传过来的参数,然后在addInstance()方法中会创建一个可以表示服务的key,接着调用DelegateConsistencyServiceImpl的put()方法,根据这个key可以选择具体的ConsistencyService实现类。
在这个put()方法中,通过key选择的是EphemeralConsistencyService,所以会调用DistroConsistencyServiceImpl的put()方法处理服务实例列表。
在DistroConsistencyServiceImpl的put()方法中又调用了onPut()方法,即把key、Instances封装成Datum对象,放入到DataStore的Map里。最后调用addTask()方法,将本次服务实例数据的变更包装成Pair对象,然后放入到一个阻塞队列里,由一个执行无限for循环的线程处理队列。
3.注册服务—如何实现高并发支撑上百万服务注册
(1)服务端处理客户端的服务注册请求梳理
(2)Nacos的异步任务设计思想
(3)异步任务和内存队列源码分析
(1)服务端处理客户端的服务注册请求梳理
Nacos客户端自动注册服务实例时,会通过HTTP的方式,请求"/nacos/v1/ns/instance"地址来调用Nacos服务端的实例注册接口。通过该地址可以找到Nacos服务端naming模块的InstanceController类。在这个类中有个register()方法,它就是服务端处理服务注册请求的入口。在这个register()方法的最后,会调用Notifier的addTask()方法,也就是把key、action包装成Pair对象,放入到一个BlockingQueue里。至此,InstanceController类中register()方法的注册逻辑就执行完了。
(2)Nacos的异步任务设计思想
一.Nacos服务实例注册的压测性能
二.Nacos服务端添加和处理异步任务的流程
三.Nacos采用异步任务来处理服务注册的好处—支撑高并发
一.Nacos服务实例注册的压测性能
参考服务发现性能测试报告。通过对3节点的集群进行服务发现性能压测,可得到接口性能负载和容量。压测容量服务数可达60W,实例注册数达110W,集群运行持续稳定。注册/查询实例TPS达到13000以上,接口达到预期。
二.Nacos服务端添加和处理异步任务的流程
首先客户端发起服务实例注册,服务端把接收的参数包装成一个Pair对象,最后放入到一个BlookingQueue里。这时对服务实例注册接口的处理已结束,服务端返回客户端响应消息了。
然后Nacos服务端会在后台开启一个单线程异步任务,这个任务会不断地获取BlookingQueue队列中的Pair对象。从这个队列获取出Pair对象后,会把信息写入注册表,从而完成服务注册。
三.Nacos采用异步任务来处理服务注册的好处—支撑高并发
好处一:接口响应时效更快
其实Nacos服务端处理服务实例注册的接口,并没有执行真正注册的动作。只是把信息包装好,放入到队列中,接口就结束返回响应给客户端了。由于代码逻辑非常简单,所以响应时效会更快。
好处二:保证服务稳定性
哪怕同时有1千个、1万个客户端同时发起实例注册请求接口,最后只是把服务实例注册任务放入到一个阻塞队列中。这就相当于使用消息队列进行流量削峰一样,后续复杂的处理逻辑,由消费者慢慢处理,异步任务就相当于消费者。
好处三:解决写时并发冲突
Nacos服务端,只有一个单线程在处理队列中的任务。也就是把阻塞队列中的服务实例注册信息,同步到Nacos的注册表中。既然是单线程进行写操作,所以就不用考虑多线程并发写的问题。虽然只会有一个线程在进行写,但是可能会有其他线程在进行读。所以会存在读写并发冲突,此时Nacos会使用写时复制策略来处理。
(3)异步任务和内存队列源码分析
一.异步任务的初始化和处理流程
二.关于无限for循环的问题
一.异步任务的初始化和处理流程
在创建DistroConsistencyServiceImpl类实例时,会直接创建一个实现了Runnable接口的Notifier类实例。
在DistroConsistencyServiceImpl类中有个init()方法。由于这个init()方法上加了@PostConstruct注解,所以在Spring创建这个类实例时会自动调用这个init()方法。init()方法会提交这个实现了Runnable接口的Notifier任务给线程池运行。
而在Notifier类的run()方法中,会通过无限for循环不断从tasks阻塞队列中获取任务来进行处理。获取出任务后,如果判断出action类型为CHANGE类型,则先把Instances对象从DataStore类中取出来,再调用listener的onChange()方法来将服务实例信息写入到注册表中。
二.关于无限for循环的问题
无限循环是否合理、是否会占用CPU资源、如果异常是否会导致循环结束?
因为Nacos服务端要一直处理Nacos客户端所发起的服务实例注册请求,而Nacos服务端它是不知道到底有多少个客户端需要进行服务注册的,所以只能写一个无限for循环一直不断重复地去执行。
既然是无限循环,就要考虑是否占用CPU资源的问题。tasks是一个阻塞队列BlockingQueue:第一.阻塞队列的特点就是不会占用CPU的资源,第二.tasks的take()方法会一直阻塞直到取得元素或当前线程中断。
在处理过程中,如果抛出未知异常,会直接被for循环中的try catch掉,继续循环处理下一个任务。
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final GlobalConfig globalConfig;private final DistroProtocol distroProtocol;private final DataStore dataStore;//用于存储所有已注册的服务实例数据private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();private volatile Notifier notifier = new Notifier();...@PostConstructpublic void init() {//初始化完成后,会将notifier任务提交给GlobalExecutor来执行GlobalExecutor.submitDistroNotifyTask(notifier);}...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);//Add new notify task to queue.public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}//tasks是一个阻塞队列,把key、action封装成Pair对象,放入队列中tasks.offer(Pair.with(datumKey, action));}@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");//无限循环for (; ;) {try {//从阻塞队列中获取任务Pair<String, DataOperation> pair = tasks.take();//处理任务handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {//把在DistroConsistencyServiceImpl.onPut()方法创建的key和action取出来String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {//把Instances信息写到注册表里去listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}...@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);} if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}
}public class GlobalExecutor {private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier"));...public static void submitDistroNotifyTask(Runnable runnable) {//向线程池提交任务,让线程池执行任务DISTRO_NOTIFY_EXECUTOR.submit(runnable);}...
}public class NameThreadFactory implements ThreadFactory {private final AtomicInteger id = new AtomicInteger(0);private String name; public NameThreadFactory(String name) {if (!name.endsWith(StringUtils.DOT)) {name += StringUtils.DOT;}this.name = name;}@Overridepublic Thread newThread(Runnable r) {String threadName = name + id.getAndDecrement();Thread thread = new Thread(r, threadName);thread.setDaemon(true);return thread;}
}public final class ExecutorFactory {...public static final class Managed {private static final String DEFAULT_NAMESPACE = "nacos";private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();...//Create a new single scheduled executor service with input thread factory and register to manager.public static ScheduledExecutorService newSingleScheduledExecutorService(final String group, final ThreadFactory threadFactory) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);//注册到ThreadPoolManager可以方便管理ScheduledExecutorService,比如注销、销毁THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);return executorService;}...}...
}public final class ThreadPoolManager {private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8);...//Register the thread pool resources with the resource manager.public void register(String namespace, String group, ExecutorService executor) {if (!resourcesManager.containsKey(namespace)) {synchronized (this) {lockers.put(namespace, new Object());}}final Object monitor = lockers.get(namespace);synchronized (monitor) {Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);if (map == null) {map = new HashMap<String, Set<ExecutorService>>(8);map.put(group, new HashSet<ExecutorService>());map.get(group).add(executor);resourcesManager.put(namespace, map);return;}if (!map.containsKey(group)) {map.put(group, new HashSet<ExecutorService>());}map.get(group).add(executor);}}//Cancel the uniform lifecycle management for all threads under this resource.public void deregister(String namespace, String group) {if (resourcesManager.containsKey(namespace)) {final Object monitor = lockers.get(namespace);synchronized (monitor) {resourcesManager.get(namespace).remove(group);}}}...
}
总结:异步任务是提升性能的一种方式。很多开源框架为了提升自身处理性能,都会采利用异步任务 + 内存队列。
4.内存注册表—如何处理注册表的高并发读写冲突
(1)服务实例注册的客户端源码和服务端源码梳理
(2)Nacos注册表结构
(3)写时复制机制介绍
(4)Nacos服务注册写入注册表源码分析
(1)服务实例注册的客户端源码和服务端源码梳理
一.客户端发起服务注册的源码梳理
订单服务、库存服务的项目引入nacos-discovery服务注册中心依赖后,当项目启动时,就会扫描到依赖中的spring.factories文件,然后去创建spring.factories文件中定义的配置类。
在spring.factories文件中:有一个名为NacosServiceRegistryAutoConfiguration配置类,在这个配置类定义了三个Bean对象:NacosServiceRegistry、NacosRegistration和NacosAutoServiceRegistration。
NacosAutoServiceRegistration类的父类实现了ApplicationListener接口,也就是实现了onApplicationEvent()这个监听事件方法。当Spring容器启动时,会发布WebServerInitializedEvent监听事件,从而被Nacos客户端即NacosAutoServiceRegistration的监听方法监听到。
这个监听事件方法会调用NacosServiceRegistry类中的register()方法,register()方法又会调用Nacos服务端实例注册的HTTP接口完成服务注册。
在发起服务实例注册接口的调用前,客户端还会开启一个BeatTask任务,这个BeatTask任务会每隔5秒向Nacos服务端发送心跳检查请求。
二.服务端处理服务注册的源码梳理
Nacos服务端处理服务注册的HTTP接口是:/nacos/v1/ns/instance。由于Nacos服务端也是个Spring Boot项目,所以通过架构图找到Nacos源码的naming模块,然后就可以通过请求地址定位到InstanceController类。
在InstanceController类中会有对应HTTP接口的register()方法,该方法最终会把客户端的实例对象包装成Datum对象放入DataStore类中,然后再包装一个Pair对象,放入Notifier的tasks内存阻塞队列。
DistroConsistencyServiceImpl中有个@PostConstruct修饰的init()方法。在该类被实例化后,这个init()方法会把一个Notifier任务提交给一个线程池执行。
Notifier的run()方法,首先会不断循环从tasks阻塞队列中获取Pair对象,然后调用Notifier的handle()方法把Instances对象从DataStore类中取出来,接着调用listener.onChange()方法把服务实例数据写入到注册表中。
(2)Nacos注册表结构
一.Nacos注册表的使用
在ServiceManager类中有一个serviceMap属性,它就是Nacos的内存注册表,Nacos注册表就是用来存放微服务实例注册信息的地方。客户端在调用其他微服务时,会先调用Nacos查询实例列表接口,查询当前可用服务,从而发起微服务调用。
//Core manager storing all services in Nacos.@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)). private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();...public Service getService(String namespaceId, String serviceName) {if (serviceMap.get(namespaceId) == null) {return null;}return chooseServiceMap(namespaceId).get(serviceName);}public Map<String, Service> chooseServiceMap(String namespaceId) {return serviceMap.get(namespaceId);}...
}@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private Map<String, Cluster> clusterMap = new HashMap<>();...
}public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {//持久化实例列表@JsonIgnoreprivate Set<Instance> persistentInstances = new HashSet<>();//临时实例列表@JsonIgnoreprivate Set<Instance> ephemeralInstances = new HashSet<>();...
}
二.Nacos注册表的结构分析
ServiceManager的serviceMap属性,即注册表结构由两层Map组合而成。也就是:Map(namespace, Map(group::serviceName, Service))。
Nacos支持对服务进行分类,最上层是一个命名空间Namespace。命名空间Namespace默认是public,也可以自定义为dev、test等。
在public命名空间下,可以包含不同的分组Group。比如定义两个分组Group:DEFAULT_GROUP_1、DEFAULT_GROUP_2。这样命名空间Namespace和分组Group就对应注册表最外层的两个Map。
在ServiceManager.serviceMap的内层Map中,其value是个Service对象。在Service类中,有一个clusterMap属性。clusterMap的key是对应的集群名字,如北京集群、广州集群等。clusterMap的value是个Cluster对象,用来存放某集群下的所有实例对象。
在Cluster类中,存在两个不同实例类型的Set集合,这两个集合就会存储具体的Instance实例对象,Instance实例对象里会包含实例的IP、Port等信息。
三.Nacos注册表的设计原因
之所以Nacos要这么设计注册表,那是为了灵活应对不同的使用场景。如果项目简单,测试、预发、生产不同环境都使用同一个Nacos服务端,那么可以通过命名空间来区分。
如果项目复杂,不同环境使用不同的Nacos服务端,那么可以通过命名空间来区分不同的模块。而订单模块下可以细分很多微服务,然后通过分组来区分不同的环境。包括在Service对象里,同一个服务也可能在多个地区都有部署。比如北京服务器部署2台、广州服务器部署2台等。
(3)写时复制机制介绍
Nacos服务端把新注册的实例写入到注册表中,用的就是写时复制机制。写时复制机制,能够很好地避免读写并发冲突。
写时复制:Copy On Write。在数据写入到某存储位置时,首先将原有内容拷贝出来,写到另一处地方,然后再将原来的引用地址修改成新对象的地址。
下面展示了一个并发冲突的例子:
public static void main(String[] args) {//假设objectSet是用来存放实例信息Set<Object> objectSet = new HashSet<>();//模拟异步任务,写入数据new Thread(new Runnable() {@Overridepublic void run() {try {//先睡眠一下,否则还没开始读,就已经写完了Thread.sleep(100L);} catch (InterruptedException e) {e.printStackTrace();}//写入10w条数据for (int i = 0; i < 100000; i++) {objectSet.add(i);}}}).start();//死循环一直读取数据,模拟高并发场景for (; ;) {for (Object o : objectSet) {System.out.println(o);}}
}
运行上面的代码就会抛出如下异常信息:
Exception in thread "main" java.util.ConcurrentModificationException
意思是在对集合迭代、读取时,如果同时对其进行修改,就会抛出ConcurrentModificationException异常。
这时候就可以采用写时复制来避免这个问题。先创建一个复制对象,把原来的数据复制一份到该复制对象上。然后在复制对象上进行新增、修改的操作,这时是不会影响原来数据的。等到在复制对象上进行的操作完成之后,再把原来对象的引用地址直接修改为复制对象的引用。
(4)Nacos服务注册写入注册表源码分析
在执行Notifier的handle()方法时,核心的代码是:
//把Instances信息写到注册表里去
listener.onChange(datumKey, dataStore.get(datumKey).value);
dataStore.get(datumKey).value就是从DataStore中获取Instances对象。listener.onChange()其实就是调用Service的onChange()方法更新注册表。
因为在注册某个服务的第一个实例时,创建的服务Service会作为Listener添加到ConsistencyService的listeners,并且已经将新创建的服务Service放入到了ServiceManager的注册表中了。所以线程池执行Notifier的handle()方法时,就能遍历所有Service进行更新。
其实注册表serviceMap只是存放了Service对象的引用,而ConsistencyService的listeners也存放了Service对象的引用。当遍历ConsistencyService的listeners,执行Service.onChange()方法时,更新的就是JVM在堆内存中的Service实例对象,也就更新了注册表。因为注册表是一个Map,最终都是引用到对内存中的Service实例对象。
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final DataStore dataStore;private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);...@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");//无限循环for (; ;) {try {//从阻塞队列中获取任务Pair<String, DataOperation> pair = tasks.take();//处理任务handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {//把在DistroConsistencyServiceImpl.onPut()方法创建的key和action取出来String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {//把Instances信息写到注册表里去listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}...@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}//Put a new record.public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}...
}//Store of data.
@Component
public class DataStore {private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);...public Datum get(String key) {return dataMap.get(key);}...
}public class Datum<T extends Record> implements Serializable {public String key;public T value;...
}//Package of instance list.
public class Instances implements Record {private List<Instance> instanceList = new ArrayList<>();...
}//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;...//Register an instance to a service in AP mode.//This method creates service or cluster silently if they don't exist.public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1.创建一个空的服务createEmptyService(namespaceId, serviceName, instance.isEphemeral());//2.根据命名空间ID、服务名获取一个服务,如果获取结果为null则抛异常Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);}//3.添加服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}...//1.创建一个空服务public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);}//Create service if not exist.public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));//now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}}private void putServiceAndInit(Service service) throws NacosException {//把Service放入注册表serviceMap中putService(service);service.init();//把Service作为监听器添加到consistencyService的listeners中consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}//Put service into manager.public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());}}}serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}...
}
其中从DataStore中获取出来的Instances对象的来源如下:
//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {//Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;...//添加服务实例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//构建要注册的服务实例对应的服务的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根据命名空间以及服务名获取要注册的服务实例对应的服务Service service = getService(namespaceId, serviceName);//使用synchronized锁住要注册的服务实例对应的服务synchronized (service) {//由于一个服务可能存在多个服务实例,所以需要根据当前注册请求的服务实例ips,获取对应服务的最新服务实例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances实现了用于在Nacos集群进行网络传输的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//执行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {//更新对应服务的服务实例列表return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}//Compare and get new instance list.public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {//先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {//把instance实例的IP当作key,instance实例当作value,放入currentInstancescurrentInstances.put(instance.toIpAddr(), instance);//把实例唯一编码添加到currentInstanceIds中currentInstanceIds.add(instance.getInstanceId());}//用来存放当前要注册的服务实例对应的服务的、所有服务实例Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}//instanceMap的key与IP和端口有关instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}//最后instanceMap里肯定会包含新注册的Instance实例//并且如果不是第一次注册,里面还会包含之前注册的Instance实例信息return new ArrayList<>(instanceMap.values());}...
}//Package of instance list.
public class Instances implements Record {private List<Instance> instanceList = new ArrayList<>();...
}
接下来是Service的onChange()方法的详情:
Service的onChange()方法需要传入两个参数:参数一是key,这个key是由KeyBuilder的buildInstanceListKey()代码创建出来的。参数二是Instances,里面有个InstanceList属性,可以存放多个Instance实例对象。实际上Instances参数可能会包含之前多个已经注册的Instance实例信息,并且一定会包含当前新注册的Instance实例信息。
Service的onChange()方法,最后会调用Service的updateIPs()方法。Service的updateIPs()方法又会调用Cluster的updateIps()方法,会把新注册的Instance更新到Cluster对象实例中。
在Cluster的updateIps()方法中,便会通过写时复制机制来更新实例Set。如果不用写时复制,那么就会并发读写同一个Set对象。如果使用写时复制,那么同一时间的读和写都是不同的Set对象。即使用新对象替换旧对象那一刻还有线程没迭代读完旧对象,也不影响。因为没有迭代读完旧对象的线程继续进行迭代读,替换的只是对象引用。ephemeralInstances变量只是引用了Set对象的地址而已。这里说的替换,只是让ephemeralInstances变量引用另外Set对象的地址。
//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model,
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private Map<String, Cluster> clusterMap = new HashMap<>();...@Overridepublic void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);for (Instance instance : value.getInstanceList()) {if (instance == null) {//Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();}//Update instances. 这里的instances里就包含了新注册的实例对象public void updateIPs(Collection<Instance> instances, boolean ephemeral) {//clusterMap表示的是该服务的集群Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}//遍历全部实例对象:包括已经注册过的实例对象 和 新注册的实例对象//这里的作用就是对相同集群下的instance进行分类for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}//判定客户端传过来的instance实例中,是否设置了ClusterNameif (StringUtils.isEmpty(instance.getClusterName())) {//如果否,就设置instance实例的ClusterName为DEFAULTinstance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}//判断之前是否存在对应的CLusterName,如果没有则需要创建新的Cluster对象if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());//创建新的Cluster集群对象Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();//将新创建的Cluster对象放入到集群clusterMap中getClusterMap().put(instance.getClusterName(), cluster);}//根据集群名字,从ipMap里面获取集群下的所有实例List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}//将客户端传过来的新注册的instance实例,添加到clusterIPs,也就是ipMap中clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}//对所有的服务实例分好类之后,按照ClusterName来更新注册表for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//entryIPs已经是根据ClusterName分好组的实例列表了List<Instance> entryIPs = entry.getValue();//调用Cluster.updateIps()方法,根据写时复制,对注册表中的每一个Cluster对象进行更新clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());}...
}public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {@JsonIgnoreprivate Set<Instance> persistentInstances = new HashSet<>();@JsonIgnoreprivate Set<Instance> ephemeralInstances = new HashSet<>();@JsonIgnoreprivate Service service;...//Update instance list.public void updateIps(List<Instance> ips, boolean ephemeral) {//先判定是否是临时实例,然后把对应的实例数据取出来,放入到新创建的toUpdateInstances集合中Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;//将老的实例列表toUpdateInstances复制一份到oldIpMap中HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}...//最后把传入进来的实例列表,重新初始化一个HaseSet,赋值给toUpdateInstancestoUpdateInstances = new HashSet<>(ips);//判断是否是临时实例,将Cluster的persistentInstances或ephemeralInstances替换为toUpdateInstancesif (ephemeral) {//直接把之前的实例列表替换成新的ephemeralInstances = toUpdateInstances;} else {//直接把之前的实例列表替换成新的persistentInstances = toUpdateInstances;}}...
}
从这部分源码中就可以看出,全程都没有对之前注册表中的数据进行操作。而是先拿出来,最后直接把新的数据替换过去,这样就完成了注册表修改。从而避免了对Set的并发读写冲突。