因为Eureka的闭源,Nacos成为了现在Spring Cloud微服务注册中心的主流方案,那咱们废话不多说,直接开始读源码(基于2.2.0版本)。
首先我们知道Nacos是基于Springboot实现自动注入的,老规矩我们来到spring-cloud-alibaba-nacos-discovery包下面的META-INF\spring.factories配置文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
我们首先来到NacosDiscoveryAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic NacosDiscoveryProperties nacosProperties() {return new NacosDiscoveryProperties();}@Bean@ConditionalOnMissingBeanpublic NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties) {return new NacosServiceDiscovery(discoveryProperties);}}
这个自动配置类注入了两个Bean,第一个Bean是从配置文件中读spring.cloud.nacos.discovery前缀的配置项,然后是第二个Bean
public class NacosServiceDiscovery {private NacosDiscoveryProperties discoveryProperties;public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties) {this.discoveryProperties = discoveryProperties;}/*** Return all instances for the given service.* @param serviceId id of service* @return list of instances* @throws NacosException nacosException*/public List<ServiceInstance> getInstances(String serviceId) throws NacosException {String group = discoveryProperties.getGroup();List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);return hostToServiceInstanceList(instances, serviceId);}/*** Return the names of all services.* @return list of service names* @throws NacosException nacosException*/public List<String> getServices() throws NacosException {String group = discoveryProperties.getGroup();ListView<String> services = discoveryProperties.namingServiceInstance().getServicesOfServer(1, Integer.MAX_VALUE, group);return services.getData();}public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) {List<ServiceInstance> result = new ArrayList<>(instances.size());for (Instance instance : instances) {ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);if (serviceInstance != null) {result.add(serviceInstance);}}return result;}public static ServiceInstance hostToServiceInstance(Instance instance,String serviceId) {if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {return null;}NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();nacosServiceInstance.setHost(instance.getIp());nacosServiceInstance.setPort(instance.getPort());nacosServiceInstance.setServiceId(serviceId);Map<String, String> metadata = new HashMap<>();metadata.put("nacos.instanceId", instance.getInstanceId());metadata.put("nacos.weight", instance.getWeight() + "");metadata.put("nacos.healthy", instance.isHealthy() + "");metadata.put("nacos.cluster", instance.getClusterName() + "");metadata.putAll(instance.getMetadata());nacosServiceInstance.setMetadata(metadata);if (metadata.containsKey("secure")) {boolean secure = Boolean.parseBoolean(metadata.get("secure"));nacosServiceInstance.setSecure(secure);}return nacosServiceInstance;}
根据代码和注释可以看出,这个Bean的核心功能是根据配置文件里Nacos的相关配置,获取实例和实例对象转化的一些功能,然后是NacosDiscoveryEndpointAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {@Bean@ConditionalOnMissingBean@ConditionalOnEnabledEndpointpublic NacosDiscoveryEndpoint nacosDiscoveryEndpoint(NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosDiscoveryEndpoint(nacosDiscoveryProperties);}@Bean@ConditionalOnEnabledHealthIndicator("nacos-discovery")public HealthIndicator nacosDiscoveryHealthIndicator(NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosDiscoveryHealthIndicator(nacosDiscoveryProperties.namingServiceInstance());}}
这个类注入了两个Bean,第一个Bean NacosDiscoveryEndpoint 的作用是作为Nacos服务发现的端点,获取当前客户端的所有注册的服务。
第二个Bean NacosDiscoveryHealthIndicator的作用是通过向Nacos注册中心请求/operator/metrics 接口来确认健康状态。
接下来这个自动配置类NacosServiceRegistryAutoConfiguration比较核心
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
/*是否开启Nacos服务发现*/
@ConditionalOnNacosDiscoveryEnabled
/*是否开启自动注册,默认为True*/
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)
/*在以下几个配置类装配完成后才进行装配*/
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,AutoServiceRegistrationAutoConfiguration.class,NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {/*读取配置文件*/@Beanpublic NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosServiceRegistry(nacosDiscoveryProperties);}/*Nacos服务注册Bean*/@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosRegistration nacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties,ApplicationContext context) {return new NacosRegistration(nacosDiscoveryProperties, context);}/*Nacos服务自动注册Bean*/@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {return new NacosAutoServiceRegistration(registry,autoServiceRegistrationProperties, registration);}}
该配置类装配了三个Bean,我们重点关注下第三个同来实现服务自动注册的Bean NacosAutoServiceRegistration
public class NacosAutoServiceRegistrationextends AbstractAutoServiceRegistration<Registration> {private static final Logger log = LoggerFactory.getLogger(NacosAutoServiceRegistration.class);private NacosRegistration registration;public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {super(serviceRegistry, autoServiceRegistrationProperties);this.registration = registration;}@Deprecatedpublic void setPort(int port) {getPort().set(port);}@Overrideprotected NacosRegistration getRegistration() {if (this.registration.getPort() < 0 && this.getPort().get() > 0) {this.registration.setPort(this.getPort().get());}Assert.isTrue(this.registration.getPort() > 0, "service.port has not been set");return this.registration;}@Overrideprotected NacosRegistration getManagementRegistration() {return null;}@Overrideprotected void register() {if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {log.debug("Registration disabled.");return;}if (this.registration.getPort() < 0) {this.registration.setPort(getPort().get());}super.register();}@Overrideprotected void registerManagement() {if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {return;}super.registerManagement();}@Overrideprotected Object getConfiguration() {return this.registration.getNacosDiscoveryProperties();}@Overrideprotected boolean isEnabled() {return this.registration.getNacosDiscoveryProperties().isRegisterEnabled();}@Override@SuppressWarnings("deprecation")protected String getAppName() {String appName = registration.getNacosDiscoveryProperties().getService();return StringUtils.isEmpty(appName) ? super.getAppName() : appName;}
通过名字我们可以看出,这个Bean的核心是redister方法,该方法调用的是父抽象类AbstractAutoServiceRegistration中的实现,然后在父类中它又调用了接口ServiceRegistry的register方法,实际由NacosServiceRegistry实现,我们进入到NacosServiceRegistry实现的register方法
public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();Instance instance = getNacosInstanceFromRegistration(registration);try {namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId,registration.toString(), e);// rethrow a RuntimeException if the registration is failed.// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);}}
然后我们重点关注其中namingService.registerInstance方法,namingService是一个接口,由NacosNamingService实现,我们进入到方法实现
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {//首先判断该实例是临时还是持久的,默认是临时的if (instance.isEphemeral()) {//新建心跳对象BeatInfo beatInfo = new BeatInfo();//设置serviceNamebeatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));//设置IPbeatInfo.setIp(instance.getIp());//设置端口号beatInfo.setPort(instance.getPort());//设置集群名beatInfo.setCluster(instance.getClusterName());//设置实例权重beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);long instanceInterval = instance.getInstanceHeartBeatInterval();//设置发送心跳时间间隔,默认5秒beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);//放入定时线程池等待执行beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}//进行服务注册serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);}
该方法首先封装了一个心跳对象,并放入ScheduledThreadPoolExecutor线程池中,默认每5秒执行一次,然后调用注册中心代理serverProxy通过向注册中心/instance接口发送进行服务注册
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",namespaceId, serviceName, instance);final Map<String, String> params = new HashMap<String, String>(9);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JSON.toJSONString(instance.getMetadata()));reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);}
我们来到注册中心的/instance接口,重点关注InstanceController的register方法
@CanDistro@PostMapping@TpsControl(pointName = "NamingInstanceRegister", name = "HttpNamingInstanceRegister")@Secured(action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);final Instance instance = HttpRequestInstanceBuilder.newBuilder().setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();getInstanceOperator().registerInstance(namespaceId, serviceName, instance);NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "", false, namespaceId,NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName), instance.getIp(),instance.getPort()));return "ok";}
重点关注getInstanceOperator().registerInstance(namespaceId, serviceName, instance)方法
/*** This method creates {@code IpPortBasedClient} if it doesn't exist.*/@Overridepublic void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//判断是否临时boolean ephemeral = instance.isEphemeral();//获取客户端IDString clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);//如果客户端不存在则新建createIpPortClientIfAbsent(clientId);//封装service对象Service service = getService(namespaceId, serviceName, ephemeral);//进行实例注册clientOperationService.registerInstance(service, instance, clientId);}
clientOperationService为一个接口且有多个实现,我们查看其实现类找到EphemeralClientOperationServiceImpl中的registerInstance方法
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {//检查实例是否合法NamingUtils.checkInstanceIsLegal(instance);//获取单例的serviceService singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM,String.format("Current service %s is persistent service, can't register ephemeral instance.",singleton.getGroupedServiceName()));}//获取客户端Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//封装实例发布信息InstancePublishInfo instanceInfo = getPublishInfo(instance);client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();client.recalculateRevision();//通知中心发布客户端服务注册事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}
然后我们关注publishEvent
/*** Request publisher publish event Publishers load lazily, calling publisher.** @param eventType class Instances type of the event type.* @param event event instance.*/private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}
在DefaultPublisher中的publish方法,加入阻塞队列等待执行
@Overridepublic boolean publish(Event event) {checkIsStart();//加入阻塞队列尾部boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);//接收事件receiveEvent(event);return true;}return true;}void checkIsStart() {if (!initialized) {throw new IllegalStateException("Publisher does not start");}}@Overridepublic void shutdown() {this.shutdown = true;this.queue.clear();}public boolean isInitialized() {return initialized;}/*** Receive and notifySubscriber to process the event.** @param event {@link Event}.*/void receiveEvent(Event event) {final long currentEventSequence = event.sequence();//检查有无订阅者if (!hasSubscriber()) {LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);return;}// Notification single event listenerfor (Subscriber subscriber : subscribers) {if (!subscriber.scopeMatches(event)) {continue;}// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",event.getClass());continue;}// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.// Remove original judge part of codes.//通知订阅者notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {//执行事件job.run();} catch (Throwable e) {LOGGER.error("Event callback exception: ", e);}}}
然后我们通过ClientRegisterServiceEvent的其他引用找到了ClientServiceIndexesManager中的onEvent方法,发现ClientRegisterServiceEvent事件被封装成了ServiceChangedEvent并发布
@Overridepublic void onEvent(Event event) {if (event instanceof ClientOperationEvent.ClientReleaseEvent) {handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientDisconnect(ClientOperationEvent.ClientReleaseEvent event) {Client client = event.getClient();for (Service each : client.getAllSubscribeService()) {removeSubscriberIndexes(each, client.getClientId());}DeregisterInstanceReason reason = event.isNative()? DeregisterInstanceReason.NATIVE_DISCONNECTED : DeregisterInstanceReason.SYNCED_DISCONNECTED;long currentTimeMillis = System.currentTimeMillis();for (Service each : client.getAllPublishedService()) {removePublisherIndexes(each, client.getClientId());InstancePublishInfo instance = client.getInstancePublishInfo(each);NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(currentTimeMillis,"", false, reason, each.getNamespace(), each.getGroup(), each.getName(),instance.getIp(), instance.getPort()));}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {addPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {removePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {addSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {removeSubscriberIndexes(service, clientId);}}private void addPublisherIndexes(Service service, String clientId) {publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());publisherIndexes.get(service).add(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}
而Nacos注册中心是通过JRaftServer的NacosStateMachine中RequestProcessor对象去执行onApply方法,而RequestProcessor为抽象类,其实现类为InstanceMetadataProcessor,我们来到其中的onApply方法
@Overridepublic Response onApply(WriteRequest request) {readLock.lock();try {MetadataOperation<InstanceMetadata> op = serializer.deserialize(request.getData().toByteArray(), processType);switch (DataOperation.valueOf(request.getOperation())) {case ADD:case CHANGE:updateInstanceMetadata(op);break;case DELETE:deleteInstanceMetadata(op);break;default:return Response.newBuilder().setSuccess(false).setErrMsg("Unsupported operation " + request.getOperation()).build();}return Response.newBuilder().setSuccess(true).build();} catch (Exception e) {Loggers.RAFT.error("onApply {} instance metadata operation failed. ", request.getOperation(), e);String errorMessage = null == e.getMessage() ? e.getClass().getName() : e.getMessage();return Response.newBuilder().setSuccess(false).setErrMsg(errorMessage).build();} finally {readLock.unlock();}}private void updateInstanceMetadata(MetadataOperation<InstanceMetadata> op) {Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());service = ServiceManager.getInstance().getSingleton(service);namingMetadataManager.updateInstanceMetadata(service, op.getTag(), op.getMetadata());NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}private void deleteInstanceMetadata(MetadataOperation<InstanceMetadata> op) {Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());service = ServiceManager.getInstance().getSingleton(service);namingMetadataManager.removeInstanceMetadata(service, op.getTag());}
核心在NamingMetadataManager的updateInstanceMetadata中
private ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>> instanceMetadataMap;/*** Update instance metadata.** @param service service* @param metadataId instance metadata id* @param instanceMetadata new instance metadata*/public void updateInstanceMetadata(Service service, String metadataId, InstanceMetadata instanceMetadata) {if (!instanceMetadataMap.containsKey(service)) {instanceMetadataMap.putIfAbsent(service, new ConcurrentHashMap<>(INITIAL_CAPACITY));}instanceMetadataMap.get(service).put(metadataId, instanceMetadata);}
其存放客户端实例的对象是一个两层的ConcurrentHashMap,第一层的Map key为service对象,第二层的Map Key为具体的实例Id,value为实例对象。