基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。
此前我们学习了接口级的服务引入订阅的refreshInterfaceInvoker方法,当时还有最为关键的notify服务通知更新的部分源码没有学习,本次我们来学习notify通知本地服务更新的源码。
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
1 notify服务通知更新
当第一次订阅服务节点,或者服务节点目录的子节点更新时,例如新的producer上下线,将会调用notify服务通知更新的方法,会更新本地缓存的数据。
notify方法的入口是FailbackRegistry的notify方法。
/*** FailbackRegistry的方法* <p>* 服务通知** @param url consumer side url* @param listener listener* @param urls provider latest urls*/
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {/** 调用doNotify方法更新*/doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);}
}
/*** FailbackRegistry的方法* <p>* 服务通知*/
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {//调用父类AbstractRegistry的方法super.notify(url, listener, urls);
}
2 AbstractRegistry#notify通知更新
该方法涉及两个重要知识点:
- 一是对于拉取到的服务节点url按照类别providers、configurators 、routers进行分类,然后遍历每个类别,依次调用RegistryDirectory#notify方法触发监听回调,进行服务数据的更新。
- 二是RegistryDirectory#notify方法通知执行完毕之后,调用saveProperties方法更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。
/*** AbstractRegistry的方法* <p>* 通知更新** @param url consumer side url* @param listener listener* @param urls provider latest urls*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {// 1-4 Empty address.logger.warn("1-4", "", "", "Ignore empty notify urls for subscribe url " + url);return;}if (logger.isInfoEnabled()) {logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());}//根据节点类别对url进行分类Map<String, List<URL>> result = new HashMap<>();//遍历url,进行分类for (URL u : urls) {//服务消费者和服务提供者的服务接口名匹配if (UrlUtils.isMatch(url, u)) {//获取url的category类别,默认providers,同时服务提供者urlServiceAddressURL固定返回providersString category = u.getCategory(DEFAULT_CATEGORY);//将url加入到对应类别的categoryList中List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());categoryList.add(u);}}//result,一般有三个元素,即三个类别,providers、configurators 、routersif (result.size() == 0) {return;}Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());//遍历每一个类别for (Map.Entry<String, List<URL>> entry : result.entrySet()) {//获取类别String category = entry.getKey();List<URL> categoryList = entry.getValue();//存入categoryNotifiedcategoryNotified.put(category, categoryList);//执行leitener的notify方法进行通知,listener可以是RegistryDirectory/** RegistryDirectory#notify通知*/listener.notify(categoryList);/** 本地缓存*/// We will update our cache file after each notification.// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.//将在每次通知后更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。//本地缓存,默认支持if (localCacheEnabled) {saveProperties(url);}}
}
3 RegistryDirectory#notify更新本地内存信息
该方法根据url更新RegistryDirectory对象的内存信息,将可能会更新RegistryDirectory 内部的configurators配置信息集合,routerChain路由链以及urlInvokerMap缓存。
在最后,会专门调用refreshOverrideAndInvoker方法,将服务提供者url转换为invoker,进行服务提供者的更新。
/*** RegistryDirectory的方法* * 服务变更通知* @param urls 服务提供者注册信息列表*/
@Override
public synchronized void notify(List<URL> urls) {if (isDestroyed()) {return;}Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull)//类别合法性过滤.filter(this::isValidCategory).filter(this::isNotCompatibleFor26x)//根据类别分组.collect(Collectors.groupingBy(this::judgeCategory));//获取配置信息url集合,可以为空List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());//将配置信息url转换为Configurator集合,并赋值给configurators属性,可以为空this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);//获取路由信息url集合,可以为空List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());//将配置信息url转换为Router集合,并加入routerChain路由链,可以为空toRouters(routerURLs).ifPresent(this::addRouters);// providers//获取服务提供者url集合,可以为空List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());// 3.x added for extend URL address//添加扩展URL地址 3.x的特性ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);//获取AddressListener,默认空集合List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners != null && !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) {providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);}}/** 将服务提供者url转换为invoker,进行服务提供者的更新*/refreshOverrideAndInvoker(providerURLs);
}
3.1 refreshOverrideAndInvoker刷新invoker
该方法将服务提供者url转换为invoker,进行服务提供者的更新,这在consumer对producer的信息更新部分是非常重要的一个方法。
url转换规则为:
- 如果URL已转换为invoker,则不再重新引用它并直接从缓存获取它,请注意,URL中的任何参数更改都将被重新引用。
- 如果传入invoker列表不为空,则表示它是最新的invoker列表。
- 如果传入invokerUrl的列表为空,则意味着该规则只是一个覆盖规则或路由规则,需要重新对比以决定是否重新引用。
/*** RegistryDirectory的方法* <p>* 将服务提供者url转换为invoker,进行服务提供者的更新** @param urls 服务提供者url*/
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nullrefreshInvoker(urls);
}/*** 将invokerURL列表转换为Invoker Map** @param invokerUrls this parameter can't be null*/
private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");//如果只有一个协议为empty的url,表示最新注册中心没有任何该服务提供者url信息if (invokerUrls.size() == 1&& invokerUrls.get(0) != null&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {//设置为禁止访问this.forbidden = true; // Forbid to access//设置routerChain的服务提供者invoker集合为一个空集合routerChain.setInvokers(BitList.emptyList());//关闭urlInvokerMap中的所有服务提供者invokerdestroyAllInvokers(); // Close all invokers}//表明可能存在服务提供者urlelse {//允许访问this.forbidden = false; // Allow to accessif (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();}// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().//使用本地引用来避免NPE。cachedInvokerUrls将被destroyAllInvokers()方法设置为空。Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;//空的服务提供者url集合if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {// 1-4 Empty address.logger.warn("1-4", "configuration ", "","Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");invokerUrls.addAll(localCachedInvokerUrls);} else {//缓存的invoker url,便于比较localCachedInvokerUrls = new HashSet<>();localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparisonthis.cachedInvokerUrls = localCachedInvokerUrls;}if (invokerUrls.isEmpty()) {return;}// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().//使用本地引用来避免NPE。urlInvokerMap将在destroyAllInvokers()方法设置为空。Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().//不能使用本地引用,因为oldUrlInvokerMap的映射可能会直接在toInvokers()中删除。Map<URL, Invoker<T>> oldUrlInvokerMap = null;if (localUrlInvokerMap != null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}/** 将URL转换为Invoker*/Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map/** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.* eg: consumer protocol = dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {// 3-1 - Failed to convert the URL address into Invokers.logger.error("3-1", "inconsistency between the client protocol and the protocol of the server","", "urls to invokers error",new IllegalStateException("urls to invokers error. invokerUrls.size :" +invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));// pre-route and build cache//invoker集合存入routerChain的invokers属性routerChain.setInvokers(this.getInvokers());//设置urlInvokerMap为新的urlInvokerMapthis.urlInvokerMap = newUrlInvokerMap;try {//销毁无用 InvokerdestroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}// 通知invoker刷新this.invokersChanged();}
}
3.2 toInvokers将URL转换为Invoker
将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。
该方法的大概逻辑为:
- 获取获取消费者需要查询过滤的协议,遍历全部最新服务提供者url,依次进行如下操作:
- 调用checkProtocolValid方法,校验当前提供者url协议是否支持当前服务消费者调用,如果不支持则跳过该提供者。服务消费者可以手动指定消费某些协议的服务提供者,其他的服务提供者将被丢弃。
- 调用mergeUrl方法,合并服务提供者url的配置,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置,从这里可以知道消费者的配置优先级大于提供者的配置。
- 从原来的缓存中获取该url对应的invoker:
- 如果已经存在该缓存,那么直接将缓存的invoker加入到新的invoker map缓存中,不再从新引用。
- 如果缓存没有该url对应的invoker,那么将会重新引用该invoker,并将新引入的invoker加入到新的invoker map缓存中。
- 返回最新的url到invoker的缓存map。
/*** RegistryDirectory的的方法** 将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。** @param oldUrlInvokerMap 此前的url到invoker的映射* @param urls 最新服务提供者url集合* @return invokers 最新的url到invoker的映射*/
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {//新的映射mapMap<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}//获取消费者需要查询过滤的协议String queryProtocols = this.queryMap.get(PROTOCOL_KEY);//遍历最新服务提供者url集合for (URL providerUrl : urls) {//校验当前提供者url协议是否支持当前服务消费者调用,如果不支持则跳过该提供者//服务消费者可以手动指定消费某些协议的服务提供者,其他的服务提供者将被丢弃if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}//合并服务提供者url的配置,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置//从这里可以知道消费者的配置优先级大于提供者的配置URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer again//从原来的缓存中获取该url对应的invokerInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);//如果缓存没有该url对应的invoker,那么将会重新引用该invokerif (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}//如果启用服务if (enabled) {//再次通过Protocol$Adaptive的refer方法引用该服务提供者//在最开始我们就是通过refer方法引用服务的,在再次见到这个方法,只不过这里的url已经变成了某个服务提供者的url了invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {// 4-2 - serialization optimizer class initialization failed.logger.error("4-2", "typo in optimizer class", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error("4-3", "", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}}//加入到新的invoker map缓存中if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {//如果已经存在该缓存,那么直接将缓存的invoker加入到新的invoker map缓存中,不再从新引用newUrlInvokerMap.put(url, invoker);}}//返回新的invoker mapreturn newUrlInvokerMap;
}
在上面的步骤中,如果是首次启动消费者,将会统一走Protocol$Adaptive的refer方法引用该服务提供者的逻辑。还记得在最开始讲consumer服务引入的时候吗,那时候我们就是通过这个refer方法引用服务的,现在再次见到这个方法,只不过此前的url则是注册中心协议url,对应着RegistryProtocol,而这里的url已经变成了某个服务提供者的url了,对应着具体的协议实现,例如DubboProtocol、RestProtocol。
我们此前就讲过了Protocol$Adaptive的refer方法实际上返回的是被wrapper包装的Protocol,这里我们直接看最底层的Protocol的refer方法,以默认协议dubbo协议的Protocol实现DubboProtocol为例子!
4 DubboProtocol#refer dubbo协议服务引入
该方法执行基于dubbo序列化协议的服务引入,最终会创建一个DubboInvoker,内部包含一个nettyClient,已经与对应的服务提供者的nettyServer建立了连接,可用于发起rpc远程调用请求。
/*** DubboProtocol的方法** @param type 服务类型* @param url 远程服务提供者url* @return* @param <T>* @throws RpcException*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//销毁检测checkDestroyed();//协议绑定引用return protocolBindingRefer(type, url);
}@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {//销毁检测checkDestroyed();//序列化优化optimizeSerialization(url);// create rpc invoker.//创建一个DubboInvoker,可用于发起rpc远程调用DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);//加入协议缓存invokersinvokers.add(invoker);return invoker;
}
4.1 getClients获取服务客户端
该方法获取服务提供者网络调用客户端。这里会判断是否使用共享连接,因为一个服务提供者根提供了很多的服务接口,这个的是否共享连接,实际上就是指的消费者引入时候,是这些服务接口是否共用一些客户端连接(默认一个),或者说不同的服务接口使用独立的客户端连接(默认一个服务一个连接)。默认是共享连接。
/*** DubboProtocol的方法* 获取服务客户端** @param url 服务提供者url* @return ExchangeClient数组*/
private ExchangeClient[] getClients(URL url) {//获取配置的连接数,默认为0int connections = url.getParameter(CONNECTIONS_KEY, 0);// whether to share connection// if not configured, connection is shared, otherwise, one connection for one service//是否共享连接,如果没有配置connections,那么连接是共享的,否则,一个服务连接一个服务if (connections == 0) {/** The xml configuration should have a higher priority than properties.* 共享连接配置,xml配置的优先级应该高于属性*/String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS): url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(shareConnectionsStr);//获取共享客户端List<ReferenceCountExchangeClient> shareClients = getSharedClient(url, connections);//设置到ExchangeClient数组中ExchangeClient[] clients = new ExchangeClient[connections];Arrays.setAll(clients, shareClients::get);return clients;}//非共享连接,表示当前服务接口使用单独的连接ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {//初始化新的客户端clients[i] = initClient(url);}return clients;
}
4.2 getSharedClient获取共享客户端连接
如果是共享连接配置,那么调用getSharedClient方法获取共享客户端连接,默认连接数为1。该方法的大概步骤为:
- 首先获取服务提供者ip:port 作为共享连接的key,即共享连接情况下,同一个服务提供者实例下的所有服务接口共享某些连接。
- 从缓存referenceClientMap获取key对应的共享客户端连接。
- 如果存在缓存,并且客户端连接全部可用,那么增加连接技术,然后返回即可。否则,只要有一个客户端不可用,就需要用可用的客户端替换不可用的客户端。
- 如果此前没有该key的客户端连接缓存或者连接不是全部可用,都要走下面的步骤,尝试新创建连接。
- 加synchronized锁,在锁代码中再次双重检测,注意这里还有线程等待唤醒机制。
- 最后判断如果客户端连接为空,那么调用buildReferenceCountExchangeClientList方法构建指定数量的客户端连接。如果连接不为空,那么遍历连接,判断如果该连接不可用,那么新创建一个连接补充进来。
- 最后的处理仍需要加synchronized锁,判断如果最终没建立连接,那么移除无效缓存,否则将最终的客户端连接存入缓存,最后唤醒其他等待的线程。
该方法的核心知识点有两个,一个是buildReferenceCountExchangeClientList方法构建指定数量的客户端连接,另一个就是方法中的synchronized锁以及等待唤醒机制。
为什么需要等待唤醒呢?因为这是共享客户端,那么可能有多个线程都在初始化同一个ip:port的多个客户端,为了避免冲突,需要加锁。
/*** DubboProtocol的方法* <p>* 获取共享客户端连接** @param url 服务提供者url* @param connectNum 共享连接数量,默认1*/
@SuppressWarnings("unchecked")
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {//获取 服务提供者ip:port 作为共享连接的keyString key = url.getAddress();//从缓存获取key对应的共享客户端连接Object clients = referenceClientMap.get(key);if (clients instanceof List) {//转换为ReferenceCountExchangeClient集合,带有引用计数的功能List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;//检测客户端连接是否全部可用//只要有一个客户端不可用,就需要用可用的客户端替换不可用的客户端。if (checkClientCanUse(typedClients)) {//如果可用//增加连接的引用计数,如果我们创建新的调用者共享相同的连接,连接将关闭,没有任何引用batchClientRefIncr(typedClients);return typedClients;}}//如果此前没有该key的连接缓存,那么新创建List<ReferenceCountExchangeClient> typedClients = null;synchronized (referenceClientMap) {//死循环for (; ; ) {// guarantee just one thread in loading condition. And Other is waiting It had finished.//双重检测锁clients = referenceClientMap.get(key);if (clients instanceof List) {typedClients = (List<ReferenceCountExchangeClient>) clients;if (checkClientCanUse(typedClients)) {batchClientRefIncr(typedClients);return typedClients;} else {//如果共享连接不是全部可用,那么缓存值先设置为为一个object对象,跳出循环referenceClientMap.put(key, PENDING_OBJECT);break;}}//如果客户端连接PENDING_OBJECT,那么表示有其他线程正在初始化当前客户端连接,那么当前线程等待直到被通知else if (clients == PENDING_OBJECT) {try {referenceClientMap.wait();} catch (InterruptedException ignored) {}}//如果没有共享连接,那么缓存值先设置为为一个object对象,跳出循环else {referenceClientMap.put(key, PENDING_OBJECT);break;}}}try {//连接数量必须大于等于1connectNum = Math.max(connectNum, 1);// If the clients is empty, then the first initialization is//如果客户端连接为空if (CollectionUtils.isEmpty(typedClients)) {/** 构建客户端连接*/typedClients = buildReferenceCountExchangeClientList(url, connectNum);}//如果连接不为空else {//遍历连接for (int i = 0; i < typedClients.size(); i++) {//如果该连接不可用,那么新创建一个连接补充进来ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {typedClients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}} finally {synchronized (referenceClientMap) {//如果最终没建立连接,那么移除无效缓存if (typedClients == null) {referenceClientMap.remove(key);} else {//将最终的客户端连接存入缓存referenceClientMap.put(key, typedClients);}//唤醒其他线程referenceClientMap.notifyAll();}}return typedClients;
}
4.3 buildReferenceCountExchangeClientList构建客户端连接
该方法构建指定数量的引用计数交换器客户端,内部循环调用buildReferenceCountExchangeClient方法构建耽单个客户端连接,内部调用initClient方法,初始化交换器客户端,启动一个nettyClient并与服务端建立了连接。
/*** DubboProtocol的方法* 构建指定数量的引用计数交换器客户端** @param url 服务提供者url* @param connectNum 客户端数量* @return*/
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {List<ReferenceCountExchangeClient> clients = new ArrayList<>();//循环调用buildReferenceCountExchangeClient方法for (int i = 0; i < connectNum; i++) {clients.add(buildReferenceCountExchangeClient(url));}return clients;
}/*** 构建一个引用计数交换器客户端** @param url* @return*/
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {//初始化交换器客户端,启动一个nettyClient并与服务端建立了连接ExchangeClient exchangeClient = initClient(url);//创建ReferenceCountExchangeClientReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);// read configs//获取服务器关闭等待超时时间,默认10000msint shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());client.setShutdownWaitTime(shutdownTimeout);return client;
}
4.4 initClient建立客户端连接
该方法创建客户端连接,大概步骤为:
- 首先获取客户端底层通信框架类型,应该和服务端的底层通信框统一,默认netty。
- 用ServiceConfigURL替换InstanceAddressURL,协议为dubbo协议。
- 获取lazy参数,判断连接是否懒加载,默认false,即饿加载。如果懒加载,那么只有在第一次调用服务时才会创建与服务端的连接,否则立即调用Exchangers.connect(url, requestHandler)方法与服务端建立底层通信客户端连接。
默认情况下,客户端为饿加载,客户端与服务端的连接,在消费者客户端启动引用服务的时候就已经建立了,即服务提供者url转换为invoker的时候,就已经建立了连接。
/*** DubboProtocol的方法* 创建一个新的连接** @param url 服务提供者url*/
private ExchangeClient initClient(URL url) {/** Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,* which means params are shared among different services. Since client is shared among services this is currently not a problem.*///获取客户端底层通信框架类型,应该和服务端的底层通信框统一,默认nettyString str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));// BIO is not allowed since it has severe performance issue.//不允许使用BIO,因为它有严重的性能问题,目前都是使用netty4if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}try {// Replace InstanceAddressURL with ServiceConfigURL.//用ServiceConfigURL替换InstanceAddressURL,协议为dubbo协议url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());url = url.addParameter(CODEC_KEY, DubboCodec.NAME);// enable heartbeat by default//默认启用心跳url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));//连接是否懒加载,默认false,即饿加载return url.getParameter(LAZY_CONNECT_KEY, false)//如果懒加载,那么只有在第一次调用服务时才会创建与服务端的连接? new LazyConnectExchangeClient(url, requestHandler)//饿加载,与服务端建立底层通信客户端连接: Exchangers.connect(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}
}
4.5 Exchangers#connect建立连接
该方法和我们此前学习的服务提供者的Exchangers#bind方法类型,只不过bind方法创建服务端,该方法创建客户端。
该方法内部基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#connect方法。
/*** Exchangers的方法** 客户端建立与服务端的连接** @param url 服务提供者url* @param handler 请求处理器* @return 客户端连接*/
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#connect方法return getExchanger(url).connect(url, handler);
}
HeaderExchanger#connect方法中,首先对handler进行包装:DecodeHandler -> HeaderExchangeHandler -> requestHandler。
- DecodeHandler用于负责内部的dubbo协议的请求解码。
- HeaderExchangeHandler用于完成请求响应的映射。
- requestHandler用于nettyHandler真正处理请求。
随后调用Transporters#connect方法启动底层远程网络通信客户端,返回Client。Transporter是Dubbo对网络传输层的抽象接口,Exchanger依赖于Transporter。
最后基于Client构建HeaderExchangeClient返回。
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {//包装handler:DecodeHandler -> HeaderExchangeHandler -> handler//调用Transporters#connect方法启动底层远程网络通信客户端,返回Client//基于Client构建HeaderExchangeClient返回return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect方法将会在handler的最外层继续包装一层ChannelHandlerDispatcher,它所有的 ChannelHandler 接口实现都会调用其中每个 ChannelHandler 元素的相应方法。随后基于Dubbo SPI机制获取Transporter的实现,并调用connect方法完成绑定,目前仅NettyTransporter,基于netty4。
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}//继续包装一层ChannelHandlerDispatcherChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}//基于Dubbo SPI机制获取Transporter的实现,并调用connect方法完成绑定return getTransporter(url).connect(url, handler);
}
4.6 NettyTransporter#connect创建NettyClient
该方法很简单,就是根据url和handler创建一个NettyClient实例,在NettyClient的构造器中,会调用doOpen()开启客户端,创建Bootstrap,设置EventLoopGroup,编配ChannelHandlerPipeline,随后调用connect方法连接服务提供者所在服务端。
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {//基于url和handler创建NettyClientreturn new NettyClient(url, handler);
}
NettyClient的构造器如下,将会调用父类构造器启动客户端。
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler//可通过CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定义客户端线程池的名称和类型//继续包装handler: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));
}
AbstractClient的构造器如下,将会获取绑定的ip和端口以及其他参数,然后调用doOpen方法真正的开启netty客户端,最后调用connect方法连接服务提供者所在服务端。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);// set default needReconnect true when channel is not connected//当通道未连接时设置默认needReconnect为trueneedReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);//初始化执行器,消费者的执行程序是全局共享的,提供者ip不需要是线程名的一部分。initExecutor(url);try {/** 创建netty客户端*/doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}try {// connect./** 连接服务提供者所在服务端*/connect();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());}} catch (RemotingException t) {// If lazy connect client fails to establish a connection, the client instance will still be created,// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exceptionif (url.getParameter(LAZY_CONNECT_KEY, false)) {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +" connect to the server " + getRemoteAddress() +" (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +t.getMessage(), t);return;}if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;} else {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);}} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}
}
4.7 doOpen初始化NettyClient
该方法用于初始化并启动netty客户端,是非常标准的netty客户端启动代码,如果你们使用过Netty,看过Netty源码,一定就会感到非常熟悉。
创建Bootstrap,设置eventGroup,编配ChannelHandler。至此成功初始化了Bootstrap,但是并没有连接服务端。
/*** NettyClient的方法** 初始化 bootstrap*/
@Override
protected void doOpen() throws Throwable {//创建NettyClientHandlerfinal NettyClientHandler nettyClientHandler = createNettyClientHandler();//创建Bootstrap,说明这是一个netty客户端bootstrap = new Bootstrap();//初始化NettyClientinitBootstrap(nettyClientHandler);
}protected NettyClientHandler createNettyClientHandler() {//创建NettyClientHandler,当前NettyClient对象本身也是一个ChannelHandler实例,其received方法委托给创建实例时传递的内部的handler处理return new NettyClientHandler(getUrl(), this);
}protected void initBootstrap(NettyClientHandler nettyClientHandler) {//配置线程组bootstrap.group(EVENT_LOOP_GROUP.get())//设置Socket 参数.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())//IO模型.channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));//设置处理器bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", new SslClientTlsHandler(getUrl()));}NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);//自定义客户端消息的业务处理逻辑Handlerch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug//解码.addLast("decoder", adapter.getDecoder())//编码.addLast("encoder", adapter.getEncoder())//心跳检测.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前创建的nettyClientHandler.addLast("handler", nettyClientHandler);String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});
}
4.8 connect连接服务端
在初始化Bootstrap之后,将调用connect方法真正的连接服务提供者所在的服务端,内部调用doConnect方法执行连接,该方法由子类实现。
/*** AbstractClient的方法* <p>* 连接服务提供者所在服务端*/
protected void connect() throws RemotingException {//加锁connectLock.lock();try {//如果已连接则返回if (isConnected()) {return;}//如果已关闭则返回if (isClosed() || isClosing()) {logger.warn("No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");return;}/** 执行连接*/doConnect();if (!isConnected()) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");} else {if (logger.isInfoEnabled()) {logger.info("Successfully connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", channel is " + this.getChannel());}}} catch (RemotingException e) {throw e;} catch (Throwable e) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: " + e.getMessage(), e);} finally {connectLock.unlock();}
}
NettyClient的doConnect方法如下,主要逻辑就是调用bootstrap.connect方法连接服务端:
/*** NettyClient的方法* 连接服务端*/
@Override
protected void doConnect() throws Throwable {long start = System.currentTimeMillis();//通过bootstrap连接服务端ChannelFuture future = bootstrap.connect(getConnectAddress());try {//等待连接超时事件boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);//如果连接成功if (ret && future.isSuccess()) {//获取通道Channel newChannel = future.channel();try {// Close old channel// copy reference//关闭旧的ChannelChannel oldChannel = NettyClient.this.channel;if (oldChannel != null) {try {if (logger.isInfoEnabled()) {logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);}oldChannel.close();} finally {NettyChannel.removeChannelIfDisconnected(oldChannel);}}} finally {if (NettyClient.this.isClosed()) {try {if (logger.isInfoEnabled()) {logger.info("Close new netty channel " + newChannel + ", because the client closed.");}newChannel.close();} finally {NettyClient.this.channel = null;NettyChannel.removeChannelIfDisconnected(newChannel);}} else {NettyClient.this.channel = newChannel;}}} else if (future.cause() != null) {Throwable cause = future.cause();// 6-1 Failed to connect to provider server by other reason.RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);logger.error("6-1", "network disconnected", "","Failed to connect to provider server by other reason.", cause);throw remotingException;} else {// 6-2 Client-side timeoutRemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + " client-side timeout "+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());logger.error("6-2", "provider crash", "","Client-side timeout.", remotingException);throw remotingException;}} finally {// just add new valid channel to NettyChannel's cacheif (!isConnected()) {//future.cancel(true);}}
}
5 saveProperties更新本地文件信息
在每次通知内存数据更新之后,更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。
/*** AbstractRegistry的方法** @param url 服务消费者url*/
private void saveProperties(URL url) {//服务缓存文件路径为 {user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cacheif (file == null) {return;}try {//需要存储的url字符串StringBuilder buf = new StringBuilder();//获取该url的不同类别节点到对应url列表的mapMap<String, List<URL>> categoryNotified = notified.get(url);//遍历所有的节点urlif (categoryNotified != null) {for (List<URL> us : categoryNotified.values()) {for (URL u : us) {if (buf.length() > 0) {//追加空格buf.append(URL_SEPARATOR);}//追加url字符串buf.append(u.toFullString());}}}//消费者url key以及对应的节点url字符串存入propertiesproperties.setProperty(url.getServiceKey(), buf.toString());//版本自增long version = lastCacheChanged.incrementAndGet();//保存properties到本地文件if (syncSaveFile) {doSaveProperties(version);} else {registryCacheExecutor.schedule(() -> doSaveProperties(version), DEFAULT_INTERVAL_SAVE_PROPERTIES, TimeUnit.MILLISECONDS);}} catch (Throwable t) {logger.warn(t.getMessage(), t);}
}
本地缓存文件路径为:{user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cache,里面缓存的内容如下,每一个服务接口占据一行,它的所有url字符串都追加在后面,通过空格分隔。
6 总结
本次我们学习了接口级别服务发现订阅refreshInterfaceInvoker方法的具体实现,大概步骤为:
- 第一次调用refreshInterfaceInvoker方法的时候,由于MigrationInvoker内部的真实消费者Invoker为null,那么需要创建一个消费者Invoker。
- 首先创建动态注册心中目录DynamicDirectory,随后调用doCreateInvoker方法创建服务消费者Invoker。
- 首先根据消费者信息转换为消费者注册信息url,内部包括消费者ip、指定引用的protocol(默认consumer协议)、指定引用的服务接口、指定引用的方法以及其他消费者信息。
- 调用registry.register方法将消费者注册信息url注册到注册中心。
- 调用directory.buildRouterChain方法构建服务调用路由链RouterChain,赋给directory的routerChain属性。
- 调用directory.subscribe方法进行服务发现、引入并订阅服务。
- directory本身是一个监听器,directory将会订阅zookeeper对应的服务接口节点下的dubbo/[service name]/providers,服务提供者目录,以及dubbo/[service name]/configurators,即配置目录,以及dubbo/[service name]/routers,即服务路由目录。
- 依靠着zookeeper的watch监听回调机制,当这些节点下的子节点发生变化时会触发回调通知RegistryDirectory执行notify方法,进而完成本地服务列表的动态更新功能。实际上服务提供者也会订阅,只不过只会订阅configurators节点。
- 在执行订阅的时候,将会进行一次providers,configurators,routers节点目录下字节点的获取,这样就获取到了当前的服务提供者url、配置信息url、服务路由url。
- 在subscribe方法的最后,也是最关键的一步,主动调用notify方法通知数据变更。这里实际上会动态更新本地内存和文件中的服务提供者缓存,可能会更新RegistryDirectory 内部的configurators配置信息集合,routerChain路由链以及urlInvokerMap缓存,这里面存放着服务提供者url到对应的Invoker的映射。
- 如果没有在本地缓存中找到某个服务提供者url的缓存,那么会将url转换为对应协议的Invoker,默认DubboInvoker,DubboInvoker的内部还会创建NettyClient客户端,并与服务提供者所在的服务端建立连接。
- 将url转换为Invoker之前,将会进行配置的合并,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置,从这里可以知道消费者的配置优先级大于提供者的配置。
- 调用cluster.join方法传入directory进行集群容错能力包装,最终返回一个ClusterInvoker作为消费者Invoker,即MockClusterInvoker,这是一个包装类,内部包含真正的集群容错Invoker,默认是FailoverClusterInvoker。
到此我们可以知道上面的各种对象的关系(注意MockClusterInvoker上面还有一个MigrationInvoker没画出来):
到此接口级服务引入学习完毕,实际上Dubbo2就是采用的接口级别服务注册和引入。后面我们将继续学习应用级服务引入,实际上这才是Dubbo3升级的一个重点,非常值得学习!