目录
1 服务发现
1.1 客户端
1.1.1 入口
1.1.2 定时拉取
1.1.3 总结
1.2 服务端
2 心跳检测
2.1 客户端
2.2 服务端
2.2.1 处理心跳请求
2.2.2 开启定时任务进行心跳检测
2.2.3 总结
1 服务发现
-
服务列表:Nacos 维护一个服务列表,记录所有已注册的服务实例。
-
动态更新:当服务实例发生变化(如新增、删除、健康状态变化)时,Nacos 会实时更新服务列表,并通知订阅了该服务的服务消费者。
1.1 客户端
1.1.1 入口
核心类:NacosNamingService.class 核心方法:getAllInstances
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;//默认是trueif (subscribe) {//1 获取实例列表serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));} else {serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));}List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<Instance>();}return list;
}public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}//1 从本地内存中获取实例信息ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);if (null == serviceObj) {//2 如果没有获取到 调用远程接口获取实例信息serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());//调用服务端接口获取实例信息///instance/list HttpMethod.GETupdateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}//3 开启定时任务获取实例信息scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());
}public void scheduleUpdateIfAbsent(String serviceName, String clusters) {//1 如果已经有定时任务 不再重新创建if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}//2 添加定时任务 拉取实例信息ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}
}
1.1.2 定时拉取
定时任务拉取实例信息
核心类:HostReactor.class 核心方法:run
public void run() {long delayTime = DEFAULT_DELAY;try {//1 从本地缓存获取实例信息ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));//2 没有获取到就远程获取if (serviceObj == null) {updateService(serviceName, clusters);return;}//3 如果数据已经过期 也去重新获取if (serviceObj.getLastRefTime() <= lastRefTime) {updateService(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force pushrefreshOnly(serviceName, clusters);}lastRefTime = serviceObj.getLastRefTime();if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {// abort the update taskNAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}delayTime = serviceObj.getCacheMillis();resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);} finally {executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {//调用http请求获取服务列表String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);if (StringUtils.isNotEmpty(result)) {processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}
1.1.3 总结
-
首先是从本地内存获取实例列表 如果不存在 就去调用远程接口获取实例列表
-
然后开启一个定时循环任务 首先判断本地内存有没有过期 如果没有过期就不拉取 如果过期了 调用远程接口获取实例列表 循环往复
1.2 服务端
核心类:InstanceController.java 核心方法:list
public ObjectNode list(HttpServletRequest request) throws Exception {String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));//1 获取服务实例集合return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();//1 从内存中获取服务集合Service service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();
}
2 心跳检测
心跳检测是指服务提供者定期向 Nacos 服务器发送心跳包,以表明自身仍然处于健康状态。Nacos 服务器接收到心跳包后,会更新服务实例的健康状态。如果某个服务实例在一定时间内没有发送心跳包,Nacos 会认为该实例不健康,并将其从服务列表中移除。
心跳检测的主要作用
-
健康状态管理:
-
检测不健康实例:通过心跳检测,Nacos 可以及时发现不健康的服务实例。
-
移除不健康实例:不健康的服务实例会被从服务列表中移除,避免服务消费者的请求路由到这些实例。
-
-
动态更新服务列表:
-
实时更新:当服务实例的健康状态发生变化时,Nacos 会实时更新服务列表,并通知订阅了该服务的服务消费者。
-
高可用性:通过及时移除不健康实例,Nacos 确保服务消费者的请求总是路由到健康的实例,提高系统的高可用性。
-
2.1 客户端
核心类:NacosNamingService.class 核心方法:registerInstance
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);if (instance.isEphemeral()) {//1 如果是临时实例 就开启心跳检测BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);beatReactor.addBeatInfo(groupedServiceName, beatInfo);}serverProxy.registerService(groupedServiceName, groupName, instance);
}public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);//1 创建唯一标识String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);//2 开启定时任务 每5秒执行一次executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}public void run() {if (beatInfo.isStopped()) {return;}long nextTime = beatInfo.getPeriod();try {//1 发送http请求发送心跳JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());}executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
总结:
-
如果是临时实例 那么注册的时候就会发送心跳信息给服务端
-
同时开启一个定时任务 定期给服务端发送心跳
2.2 服务端
2.2.1 处理心跳请求
核心类:InstanceController.java 核心方法:beat
public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());serviceManager.registerInstance(namespaceId, serviceName, instance);}Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}//1 处理心跳请求service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;
}public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);//1 处理心跳请求HealthCheckReactor.scheduleNow(clientBeatProcessor);
}public void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);List<Instance> instances = cluster.allIPs(true);for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}//1 将上次心跳事件修改为现在instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {//2 如果服务之前状态为不健康 将服务状态设置为健康instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);//3 发布状态变更事件 getPushService().serviceChanged(service);}}}}
}
2.2.2 开启定时任务进行心跳检测
核心类:ServiceManager.java 核心方法:registerInstance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1 创建一个空服务 如果不存在就直接创建 存在就什么都不做createEmptyService(namespaceId, serviceName, instance.isEphemeral());//2 获取内存中存储该服务的相关信息Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}//3 添加服务addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);
}public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {//1 通过命名空间和服务名从内存中获取服务信息Service service = getService(namespaceId, serviceName);//2 获取失败if (service == null) {//3 组装服务信息Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrown//4 注册最新心跳时间service.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();//5 存储服务并初始化putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}private void putServiceAndInit(Service service) throws NacosException {//1 存储服务putService(service);//2 初始化服务service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}public void init() {//注册心跳任务HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}public void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {//1 如果15秒内没有心跳 状态设置为不健康instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));//2 30秒内没有心跳 删除实例 通过http请求调用自身删除接口 deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
2.2.3 总结
-
服务端接收到了请求之后 会修改实例的上一次心跳事件
-
服务注册时 就开启了一个定时任务 检测心跳 15秒内没有心跳设置实例状态为不健康 30秒内没有心跳就删除实例