Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新

基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。

此前我们学习了接口级的服务引入订阅的refreshInterfaceInvoker方法,当时还有最为关键的notify服务通知更新的部分源码没有学习,本次我们来学习notify通知本地服务更新的源码。

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)

1 notify服务通知更新

当第一次订阅服务节点,或者服务节点目录的子节点更新时,例如新的producer上下线,将会调用notify服务通知更新的方法,会更新本地缓存的数据。

notify方法的入口是FailbackRegistry的notify方法。

/*** FailbackRegistry的方法* <p>* 服务通知** @param url      consumer side url* @param listener listener* @param urls     provider latest urls*/
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {/** 调用doNotify方法更新*/doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);}
}
/*** FailbackRegistry的方法* <p>* 服务通知*/
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {//调用父类AbstractRegistry的方法super.notify(url, listener, urls);
}

2 AbstractRegistry#notify通知更新

该方法涉及两个重要知识点:

  1. 一是对于拉取到的服务节点url按照类别providers、configurators 、routers进行分类,然后遍历每个类别,依次调用RegistryDirectory#notify方法触发监听回调,进行服务数据的更新。
  2. 二是RegistryDirectory#notify方法通知执行完毕之后,调用saveProperties方法更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。
/*** AbstractRegistry的方法* <p>* 通知更新** @param url      consumer side url* @param listener listener* @param urls     provider latest urls*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {// 1-4 Empty address.logger.warn("1-4", "", "", "Ignore empty notify urls for subscribe url " + url);return;}if (logger.isInfoEnabled()) {logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());}//根据节点类别对url进行分类Map<String, List<URL>> result = new HashMap<>();//遍历url,进行分类for (URL u : urls) {//服务消费者和服务提供者的服务接口名匹配if (UrlUtils.isMatch(url, u)) {//获取url的category类别,默认providers,同时服务提供者urlServiceAddressURL固定返回providersString category = u.getCategory(DEFAULT_CATEGORY);//将url加入到对应类别的categoryList中List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());categoryList.add(u);}}//result,一般有三个元素,即三个类别,providers、configurators 、routersif (result.size() == 0) {return;}Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());//遍历每一个类别for (Map.Entry<String, List<URL>> entry : result.entrySet()) {//获取类别String category = entry.getKey();List<URL> categoryList = entry.getValue();//存入categoryNotifiedcategoryNotified.put(category, categoryList);//执行leitener的notify方法进行通知,listener可以是RegistryDirectory/** RegistryDirectory#notify通知*/listener.notify(categoryList);/** 本地缓存*/// We will update our cache file after each notification.// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.//将在每次通知后更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。//本地缓存,默认支持if (localCacheEnabled) {saveProperties(url);}}
}

3 RegistryDirectory#notify更新本地内存信息

该方法根据url更新RegistryDirectory对象的内存信息,将可能会更新RegistryDirectory 内部的configurators配置信息集合,routerChain路由链以及urlInvokerMap缓存。

在最后,会专门调用refreshOverrideAndInvoker方法,将服务提供者url转换为invoker,进行服务提供者的更新。

/*** RegistryDirectory的方法* * 服务变更通知* @param urls 服务提供者注册信息列表*/
@Override
public synchronized void notify(List<URL> urls) {if (isDestroyed()) {return;}Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull)//类别合法性过滤.filter(this::isValidCategory).filter(this::isNotCompatibleFor26x)//根据类别分组.collect(Collectors.groupingBy(this::judgeCategory));//获取配置信息url集合,可以为空List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());//将配置信息url转换为Configurator集合,并赋值给configurators属性,可以为空this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);//获取路由信息url集合,可以为空List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());//将配置信息url转换为Router集合,并加入routerChain路由链,可以为空toRouters(routerURLs).ifPresent(this::addRouters);// providers//获取服务提供者url集合,可以为空List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());// 3.x added for extend URL address//添加扩展URL地址 3.x的特性ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);//获取AddressListener,默认空集合List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners != null && !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) {providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);}}/** 将服务提供者url转换为invoker,进行服务提供者的更新*/refreshOverrideAndInvoker(providerURLs);
}

3.1 refreshOverrideAndInvoker刷新invoker

该方法将服务提供者url转换为invoker,进行服务提供者的更新,这在consumer对producer的信息更新部分是非常重要的一个方法。

url转换规则为:

  1. 如果URL已转换为invoker,则不再重新引用它并直接从缓存获取它,请注意,URL中的任何参数更改都将被重新引用。
  2. 如果传入invoker列表不为空,则表示它是最新的invoker列表。
  3. 如果传入invokerUrl的列表为空,则意味着该规则只是一个覆盖规则或路由规则,需要重新对比以决定是否重新引用。
/*** RegistryDirectory的方法* <p>* 将服务提供者url转换为invoker,进行服务提供者的更新** @param urls 服务提供者url*/
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nullrefreshInvoker(urls);
}/*** 将invokerURL列表转换为Invoker Map** @param invokerUrls this parameter can't be null*/
private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");//如果只有一个协议为empty的url,表示最新注册中心没有任何该服务提供者url信息if (invokerUrls.size() == 1&& invokerUrls.get(0) != null&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {//设置为禁止访问this.forbidden = true; // Forbid to access//设置routerChain的服务提供者invoker集合为一个空集合routerChain.setInvokers(BitList.emptyList());//关闭urlInvokerMap中的所有服务提供者invokerdestroyAllInvokers(); // Close all invokers}//表明可能存在服务提供者urlelse {//允许访问this.forbidden = false; // Allow to accessif (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();}// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().//使用本地引用来避免NPE。cachedInvokerUrls将被destroyAllInvokers()方法设置为空。Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;//空的服务提供者url集合if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {// 1-4 Empty address.logger.warn("1-4", "configuration ", "","Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");invokerUrls.addAll(localCachedInvokerUrls);} else {//缓存的invoker url,便于比较localCachedInvokerUrls = new HashSet<>();localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparisonthis.cachedInvokerUrls = localCachedInvokerUrls;}if (invokerUrls.isEmpty()) {return;}// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().//使用本地引用来避免NPE。urlInvokerMap将在destroyAllInvokers()方法设置为空。Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().//不能使用本地引用,因为oldUrlInvokerMap的映射可能会直接在toInvokers()中删除。Map<URL, Invoker<T>> oldUrlInvokerMap = null;if (localUrlInvokerMap != null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}/** 将URL转换为Invoker*/Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map/** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.*    eg: consumer protocol = dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {// 3-1 - Failed to convert the URL address into Invokers.logger.error("3-1", "inconsistency between the client protocol and the protocol of the server","", "urls to invokers error",new IllegalStateException("urls to invokers error. invokerUrls.size :" +invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));// pre-route and build cache//invoker集合存入routerChain的invokers属性routerChain.setInvokers(this.getInvokers());//设置urlInvokerMap为新的urlInvokerMapthis.urlInvokerMap = newUrlInvokerMap;try {//销毁无用 InvokerdestroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}// 通知invoker刷新this.invokersChanged();}
}

3.2 toInvokers将URL转换为Invoker

将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。

该方法的大概逻辑为:

  1. 获取获取消费者需要查询过滤的协议,遍历全部最新服务提供者url,依次进行如下操作:
  2. 调用checkProtocolValid方法,校验当前提供者url协议是否支持当前服务消费者调用,如果不支持则跳过该提供者。服务消费者可以手动指定消费某些协议的服务提供者,其他的服务提供者将被丢弃。
  3. 调用mergeUrl方法,合并服务提供者url的配置,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置,从这里可以知道消费者的配置优先级大于提供者的配置。
  4. 从原来的缓存中获取该url对应的invoker:
    1. 如果已经存在该缓存,那么直接将缓存的invoker加入到新的invoker map缓存中,不再从新引用。
    2. 如果缓存没有该url对应的invoker,那么将会重新引用该invoker,并将新引入的invoker加入到新的invoker map缓存中。
  5. 返回最新的url到invoker的缓存map。
/*** RegistryDirectory的的方法** 将url转换为Invoker,如果url已被引用,将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。** @param oldUrlInvokerMap 此前的url到invoker的映射* @param urls 最新服务提供者url集合* @return invokers 最新的url到invoker的映射*/
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {//新的映射mapMap<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}//获取消费者需要查询过滤的协议String queryProtocols = this.queryMap.get(PROTOCOL_KEY);//遍历最新服务提供者url集合for (URL providerUrl : urls) {//校验当前提供者url协议是否支持当前服务消费者调用,如果不支持则跳过该提供者//服务消费者可以手动指定消费某些协议的服务提供者,其他的服务提供者将被丢弃if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}//合并服务提供者url的配置,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置//从这里可以知道消费者的配置优先级大于提供者的配置URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer again//从原来的缓存中获取该url对应的invokerInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);//如果缓存没有该url对应的invoker,那么将会重新引用该invokerif (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}//如果启用服务if (enabled) {//再次通过Protocol$Adaptive的refer方法引用该服务提供者//在最开始我们就是通过refer方法引用服务的,在再次见到这个方法,只不过这里的url已经变成了某个服务提供者的url了invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {// 4-2 - serialization optimizer class initialization failed.logger.error("4-2", "typo in optimizer class", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error("4-3", "", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}}//加入到新的invoker map缓存中if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {//如果已经存在该缓存,那么直接将缓存的invoker加入到新的invoker map缓存中,不再从新引用newUrlInvokerMap.put(url, invoker);}}//返回新的invoker mapreturn newUrlInvokerMap;
}

在上面的步骤中,如果是首次启动消费者,将会统一走Protocol$Adaptive的refer方法引用该服务提供者的逻辑。还记得在最开始讲consumer服务引入的时候吗,那时候我们就是通过这个refer方法引用服务的,现在再次见到这个方法,只不过此前的url则是注册中心协议url,对应着RegistryProtocol,而这里的url已经变成了某个服务提供者的url了,对应着具体的协议实现,例如DubboProtocol、RestProtocol。

我们此前就讲过了Protocol$Adaptive的refer方法实际上返回的是被wrapper包装的Protocol,这里我们直接看最底层的Protocol的refer方法,以默认协议dubbo协议的Protocol实现DubboProtocol为例子!

4 DubboProtocol#refer dubbo协议服务引入

该方法执行基于dubbo序列化协议的服务引入,最终会创建一个DubboInvoker,内部包含一个nettyClient,已经与对应的服务提供者的nettyServer建立了连接,可用于发起rpc远程调用请求。

/*** DubboProtocol的方法** @param type 服务类型* @param url  远程服务提供者url* @return* @param <T>* @throws RpcException*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//销毁检测checkDestroyed();//协议绑定引用return protocolBindingRefer(type, url);
}@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {//销毁检测checkDestroyed();//序列化优化optimizeSerialization(url);// create rpc invoker.//创建一个DubboInvoker,可用于发起rpc远程调用DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);//加入协议缓存invokersinvokers.add(invoker);return invoker;
}

4.1 getClients获取服务客户端

该方法获取服务提供者网络调用客户端。这里会判断是否使用共享连接,因为一个服务提供者根提供了很多的服务接口,这个的是否共享连接,实际上就是指的消费者引入时候,是这些服务接口是否共用一些客户端连接(默认一个),或者说不同的服务接口使用独立的客户端连接(默认一个服务一个连接)。默认是共享连接

/*** DubboProtocol的方法* 获取服务客户端** @param url 服务提供者url* @return ExchangeClient数组*/
private ExchangeClient[] getClients(URL url) {//获取配置的连接数,默认为0int connections = url.getParameter(CONNECTIONS_KEY, 0);// whether to share connection// if not configured, connection is shared, otherwise, one connection for one service//是否共享连接,如果没有配置connections,那么连接是共享的,否则,一个服务连接一个服务if (connections == 0) {/** The xml configuration should have a higher priority than properties.* 共享连接配置,xml配置的优先级应该高于属性*/String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS): url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(shareConnectionsStr);//获取共享客户端List<ReferenceCountExchangeClient> shareClients = getSharedClient(url, connections);//设置到ExchangeClient数组中ExchangeClient[] clients = new ExchangeClient[connections];Arrays.setAll(clients, shareClients::get);return clients;}//非共享连接,表示当前服务接口使用单独的连接ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {//初始化新的客户端clients[i] = initClient(url);}return clients;
}

4.2 getSharedClient获取共享客户端连接

如果是共享连接配置,那么调用getSharedClient方法获取共享客户端连接,默认连接数为1。该方法的大概步骤为:

  1. 首先获取服务提供者ip:port 作为共享连接的key,即共享连接情况下,同一个服务提供者实例下的所有服务接口共享某些连接。
  2. 从缓存referenceClientMap获取key对应的共享客户端连接。
  3. 如果存在缓存,并且客户端连接全部可用,那么增加连接技术,然后返回即可。否则,只要有一个客户端不可用,就需要用可用的客户端替换不可用的客户端。
  4. 如果此前没有该key的客户端连接缓存或者连接不是全部可用,都要走下面的步骤,尝试新创建连接。
  5. synchronized锁,在锁代码中再次双重检测,注意这里还有线程等待唤醒机制。
  6. 最后判断如果客户端连接为空,那么调用buildReferenceCountExchangeClientList方法构建指定数量的客户端连接。如果连接不为空,那么遍历连接,判断如果该连接不可用,那么新创建一个连接补充进来。
  7. 最后的处理仍需要加synchronized锁,判断如果最终没建立连接,那么移除无效缓存,否则将最终的客户端连接存入缓存,最后唤醒其他等待的线程。

该方法的核心知识点有两个,一个是buildReferenceCountExchangeClientList方法构建指定数量的客户端连接,另一个就是方法中的synchronized锁以及等待唤醒机制。

为什么需要等待唤醒呢?因为这是共享客户端,那么可能有多个线程都在初始化同一个ip:port的多个客户端,为了避免冲突,需要加锁。

/*** DubboProtocol的方法* <p>* 获取共享客户端连接** @param url        服务提供者url* @param connectNum 共享连接数量,默认1*/
@SuppressWarnings("unchecked")
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {//获取 服务提供者ip:port 作为共享连接的keyString key = url.getAddress();//从缓存获取key对应的共享客户端连接Object clients = referenceClientMap.get(key);if (clients instanceof List) {//转换为ReferenceCountExchangeClient集合,带有引用计数的功能List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;//检测客户端连接是否全部可用//只要有一个客户端不可用,就需要用可用的客户端替换不可用的客户端。if (checkClientCanUse(typedClients)) {//如果可用//增加连接的引用计数,如果我们创建新的调用者共享相同的连接,连接将关闭,没有任何引用batchClientRefIncr(typedClients);return typedClients;}}//如果此前没有该key的连接缓存,那么新创建List<ReferenceCountExchangeClient> typedClients = null;synchronized (referenceClientMap) {//死循环for (; ; ) {// guarantee just one thread in loading condition. And Other is waiting It had finished.//双重检测锁clients = referenceClientMap.get(key);if (clients instanceof List) {typedClients = (List<ReferenceCountExchangeClient>) clients;if (checkClientCanUse(typedClients)) {batchClientRefIncr(typedClients);return typedClients;} else {//如果共享连接不是全部可用,那么缓存值先设置为为一个object对象,跳出循环referenceClientMap.put(key, PENDING_OBJECT);break;}}//如果客户端连接PENDING_OBJECT,那么表示有其他线程正在初始化当前客户端连接,那么当前线程等待直到被通知else if (clients == PENDING_OBJECT) {try {referenceClientMap.wait();} catch (InterruptedException ignored) {}}//如果没有共享连接,那么缓存值先设置为为一个object对象,跳出循环else {referenceClientMap.put(key, PENDING_OBJECT);break;}}}try {//连接数量必须大于等于1connectNum = Math.max(connectNum, 1);// If the clients is empty, then the first initialization is//如果客户端连接为空if (CollectionUtils.isEmpty(typedClients)) {/** 构建客户端连接*/typedClients = buildReferenceCountExchangeClientList(url, connectNum);}//如果连接不为空else {//遍历连接for (int i = 0; i < typedClients.size(); i++) {//如果该连接不可用,那么新创建一个连接补充进来ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {typedClients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}} finally {synchronized (referenceClientMap) {//如果最终没建立连接,那么移除无效缓存if (typedClients == null) {referenceClientMap.remove(key);} else {//将最终的客户端连接存入缓存referenceClientMap.put(key, typedClients);}//唤醒其他线程referenceClientMap.notifyAll();}}return typedClients;
}

4.3 buildReferenceCountExchangeClientList构建客户端连接

该方法构建指定数量的引用计数交换器客户端,内部循环调用buildReferenceCountExchangeClient方法构建耽单个客户端连接,内部调用initClient方法,初始化交换器客户端,启动一个nettyClient并与服务端建立了连接。

/*** DubboProtocol的方法* 构建指定数量的引用计数交换器客户端** @param url 服务提供者url* @param connectNum 客户端数量* @return*/
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {List<ReferenceCountExchangeClient> clients = new ArrayList<>();//循环调用buildReferenceCountExchangeClient方法for (int i = 0; i < connectNum; i++) {clients.add(buildReferenceCountExchangeClient(url));}return clients;
}/*** 构建一个引用计数交换器客户端** @param url* @return*/
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {//初始化交换器客户端,启动一个nettyClient并与服务端建立了连接ExchangeClient exchangeClient = initClient(url);//创建ReferenceCountExchangeClientReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);// read configs//获取服务器关闭等待超时时间,默认10000msint shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());client.setShutdownWaitTime(shutdownTimeout);return client;
}

4.4 initClient建立客户端连接

该方法创建客户端连接,大概步骤为:

  1. 首先获取客户端底层通信框架类型,应该和服务端的底层通信框统一,默认netty。
  2. 用ServiceConfigURL替换InstanceAddressURL,协议为dubbo协议。
  3. 获取lazy参数,判断连接是否懒加载,默认false,即饿加载。如果懒加载,那么只有在第一次调用服务时才会创建与服务端的连接,否则立即调用Exchangers.connect(url, requestHandler)方法与服务端建立底层通信客户端连接。

默认情况下,客户端为饿加载,客户端与服务端的连接,在消费者客户端启动引用服务的时候就已经建立了,即服务提供者url转换为invoker的时候,就已经建立了连接。

/*** DubboProtocol的方法* 创建一个新的连接** @param url 服务提供者url*/
private ExchangeClient initClient(URL url) {/** Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,* which means params are shared among different services. Since client is shared among services this is currently not a problem.*///获取客户端底层通信框架类型,应该和服务端的底层通信框统一,默认nettyString str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));// BIO is not allowed since it has severe performance issue.//不允许使用BIO,因为它有严重的性能问题,目前都是使用netty4if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}try {// Replace InstanceAddressURL with ServiceConfigURL.//用ServiceConfigURL替换InstanceAddressURL,协议为dubbo协议url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());url = url.addParameter(CODEC_KEY, DubboCodec.NAME);// enable heartbeat by default//默认启用心跳url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));//连接是否懒加载,默认false,即饿加载return url.getParameter(LAZY_CONNECT_KEY, false)//如果懒加载,那么只有在第一次调用服务时才会创建与服务端的连接? new LazyConnectExchangeClient(url, requestHandler)//饿加载,与服务端建立底层通信客户端连接: Exchangers.connect(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}
}

4.5 Exchangers#connect建立连接

该方法和我们此前学习的服务提供者的Exchangers#bind方法类型,只不过bind方法创建服务端,该方法创建客户端。

该方法内部基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#connect方法。

/*** Exchangers的方法** 客户端建立与服务端的连接** @param url 服务提供者url* @param handler 请求处理器* @return 客户端连接*/
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#connect方法return getExchanger(url).connect(url, handler);
}

HeaderExchanger#connect方法中,首先对handler进行包装:DecodeHandler -> HeaderExchangeHandler -> requestHandler。

  1. DecodeHandler用于负责内部的dubbo协议的请求解码。
  2. HeaderExchangeHandler用于完成请求响应的映射。
  3. requestHandler用于nettyHandler真正处理请求。

随后调用Transporters#connect方法启动底层远程网络通信客户端,返回Client。Transporter是Dubbo对网络传输层的抽象接口,Exchanger依赖于Transporter。

最后基于Client构建HeaderExchangeClient返回。


@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {//包装handler:DecodeHandler -> HeaderExchangeHandler -> handler//调用Transporters#connect方法启动底层远程网络通信客户端,返回Client//基于Client构建HeaderExchangeClient返回return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect方法将会在handler的最外层继续包装一层ChannelHandlerDispatcher,它所有的 ChannelHandler 接口实现都会调用其中每个 ChannelHandler 元素的相应方法。随后基于Dubbo SPI机制获取Transporter的实现,并调用connect方法完成绑定,目前仅NettyTransporter,基于netty4。

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}//继续包装一层ChannelHandlerDispatcherChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}//基于Dubbo SPI机制获取Transporter的实现,并调用connect方法完成绑定return getTransporter(url).connect(url, handler);
}

4.6 NettyTransporter#connect创建NettyClient

该方法很简单,就是根据url和handler创建一个NettyClient实例,在NettyClient的构造器中,会调用doOpen()开启客户端,创建Bootstrap,设置EventLoopGroup,编配ChannelHandlerPipeline,随后调用connect方法连接服务提供者所在服务端。

@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {//基于url和handler创建NettyClientreturn new NettyClient(url, handler);
}

NettyClient的构造器如下,将会调用父类构造器启动客户端。

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler//可通过CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定义客户端线程池的名称和类型//继续包装handler: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));
}

AbstractClient的构造器如下,将会获取绑定的ip和端口以及其他参数,然后调用doOpen方法真正的开启netty客户端,最后调用connect方法连接服务提供者所在服务端。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);// set default needReconnect true when channel is not connected//当通道未连接时设置默认needReconnect为trueneedReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);//初始化执行器,消费者的执行程序是全局共享的,提供者ip不需要是线程名的一部分。initExecutor(url);try {/** 创建netty客户端*/doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}try {// connect./** 连接服务提供者所在服务端*/connect();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());}} catch (RemotingException t) {// If lazy connect client fails to establish a connection, the client instance will still be created,// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exceptionif (url.getParameter(LAZY_CONNECT_KEY, false)) {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +" connect to the server " + getRemoteAddress() +" (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +t.getMessage(), t);return;}if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;} else {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);}} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}
}

4.7 doOpen初始化NettyClient

该方法用于初始化并启动netty客户端,是非常标准的netty客户端启动代码,如果你们使用过Netty,看过Netty源码,一定就会感到非常熟悉。

创建Bootstrap,设置eventGroup,编配ChannelHandler。至此成功初始化了Bootstrap,但是并没有连接服务端。

/*** NettyClient的方法** 初始化 bootstrap*/
@Override
protected void doOpen() throws Throwable {//创建NettyClientHandlerfinal NettyClientHandler nettyClientHandler = createNettyClientHandler();//创建Bootstrap,说明这是一个netty客户端bootstrap = new Bootstrap();//初始化NettyClientinitBootstrap(nettyClientHandler);
}protected NettyClientHandler createNettyClientHandler() {//创建NettyClientHandler,当前NettyClient对象本身也是一个ChannelHandler实例,其received方法委托给创建实例时传递的内部的handler处理return new NettyClientHandler(getUrl(), this);
}protected void initBootstrap(NettyClientHandler nettyClientHandler) {//配置线程组bootstrap.group(EVENT_LOOP_GROUP.get())//设置Socket 参数.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())//IO模型.channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));//设置处理器bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", new SslClientTlsHandler(getUrl()));}NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);//自定义客户端消息的业务处理逻辑Handlerch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug//解码.addLast("decoder", adapter.getDecoder())//编码.addLast("encoder", adapter.getEncoder())//心跳检测.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前创建的nettyClientHandler.addLast("handler", nettyClientHandler);String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});
}

4.8 connect连接服务端

在初始化Bootstrap之后,将调用connect方法真正的连接服务提供者所在的服务端,内部调用doConnect方法执行连接,该方法由子类实现。

 /*** AbstractClient的方法* <p>* 连接服务提供者所在服务端*/
protected void connect() throws RemotingException {//加锁connectLock.lock();try {//如果已连接则返回if (isConnected()) {return;}//如果已关闭则返回if (isClosed() || isClosing()) {logger.warn("No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");return;}/** 执行连接*/doConnect();if (!isConnected()) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");} else {if (logger.isInfoEnabled()) {logger.info("Successfully connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", channel is " + this.getChannel());}}} catch (RemotingException e) {throw e;} catch (Throwable e) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: " + e.getMessage(), e);} finally {connectLock.unlock();}
}

NettyClient的doConnect方法如下,主要逻辑就是调用bootstrap.connect方法连接服务端:

/*** NettyClient的方法* 连接服务端*/
@Override
protected void doConnect() throws Throwable {long start = System.currentTimeMillis();//通过bootstrap连接服务端ChannelFuture future = bootstrap.connect(getConnectAddress());try {//等待连接超时事件boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);//如果连接成功if (ret && future.isSuccess()) {//获取通道Channel newChannel = future.channel();try {// Close old channel// copy reference//关闭旧的ChannelChannel oldChannel = NettyClient.this.channel;if (oldChannel != null) {try {if (logger.isInfoEnabled()) {logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);}oldChannel.close();} finally {NettyChannel.removeChannelIfDisconnected(oldChannel);}}} finally {if (NettyClient.this.isClosed()) {try {if (logger.isInfoEnabled()) {logger.info("Close new netty channel " + newChannel + ", because the client closed.");}newChannel.close();} finally {NettyClient.this.channel = null;NettyChannel.removeChannelIfDisconnected(newChannel);}} else {NettyClient.this.channel = newChannel;}}} else if (future.cause() != null) {Throwable cause = future.cause();// 6-1 Failed to connect to provider server by other reason.RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);logger.error("6-1", "network disconnected", "","Failed to connect to provider server by other reason.", cause);throw remotingException;} else {// 6-2 Client-side timeoutRemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + " client-side timeout "+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());logger.error("6-2", "provider crash", "","Client-side timeout.", remotingException);throw remotingException;}} finally {// just add new valid channel to NettyChannel's cacheif (!isConnected()) {//future.cancel(true);}}
}

5 saveProperties更新本地文件信息

在每次通知内存数据更新之后,更新缓存文件。当注册中心由于网络抖动而订阅失败时,至少可以返回现有的缓存的URL。

/*** AbstractRegistry的方法** @param url 服务消费者url*/
private void saveProperties(URL url) {//服务缓存文件路径为 {user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cacheif (file == null) {return;}try {//需要存储的url字符串StringBuilder buf = new StringBuilder();//获取该url的不同类别节点到对应url列表的mapMap<String, List<URL>> categoryNotified = notified.get(url);//遍历所有的节点urlif (categoryNotified != null) {for (List<URL> us : categoryNotified.values()) {for (URL u : us) {if (buf.length() > 0) {//追加空格buf.append(URL_SEPARATOR);}//追加url字符串buf.append(u.toFullString());}}}//消费者url key以及对应的节点url字符串存入propertiesproperties.setProperty(url.getServiceKey(), buf.toString());//版本自增long version = lastCacheChanged.incrementAndGet();//保存properties到本地文件if (syncSaveFile) {doSaveProperties(version);} else {registryCacheExecutor.schedule(() -> doSaveProperties(version), DEFAULT_INTERVAL_SAVE_PROPERTIES, TimeUnit.MILLISECONDS);}} catch (Throwable t) {logger.warn(t.getMessage(), t);}
}

本地缓存文件路径为:{user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cache,里面缓存的内容如下,每一个服务接口占据一行,它的所有url字符串都追加在后面,通过空格分隔。
在这里插入图片描述

6 总结

本次我们学习了接口级别服务发现订阅refreshInterfaceInvoker方法的具体实现,大概步骤为:

  1. 第一次调用refreshInterfaceInvoker方法的时候,由于MigrationInvoker内部的真实消费者Invoker为null,那么需要创建一个消费者Invoker。
  2. 首先创建动态注册心中目录DynamicDirectory,随后调用doCreateInvoker方法创建服务消费者Invoker。
    1. 首先根据消费者信息转换为消费者注册信息url,内部包括消费者ip、指定引用的protocol(默认consumer协议)、指定引用的服务接口、指定引用的方法以及其他消费者信息。
    2. 调用registry.register方法将消费者注册信息url注册到注册中心。
    3. 调用directory.buildRouterChain方法构建服务调用路由链RouterChain,赋给directory的routerChain属性。
    4. 调用directory.subscribe方法进行服务发现、引入并订阅服务。
      1. directory本身是一个监听器,directory将会订阅zookeeper对应的服务接口节点下的dubbo/[service name]/providers,服务提供者目录,以及dubbo/[service name]/configurators,即配置目录,以及dubbo/[service name]/routers,即服务路由目录。
      2. 依靠着zookeeper的watch监听回调机制,当这些节点下的子节点发生变化时会触发回调通知RegistryDirectory执行notify方法,进而完成本地服务列表的动态更新功能。实际上服务提供者也会订阅,只不过只会订阅configurators节点。
      3. 在执行订阅的时候,将会进行一次providers,configurators,routers节点目录下字节点的获取,这样就获取到了当前的服务提供者url、配置信息url、服务路由url。
      4. 在subscribe方法的最后,也是最关键的一步,主动调用notify方法通知数据变更。这里实际上会动态更新本地内存和文件中的服务提供者缓存,可能会更新RegistryDirectory 内部的configurators配置信息集合,routerChain路由链以及urlInvokerMap缓存,这里面存放着服务提供者url到对应的Invoker的映射。
        1. 如果没有在本地缓存中找到某个服务提供者url的缓存,那么会将url转换为对应协议的Invoker,默认DubboInvoker,DubboInvoker的内部还会创建NettyClient客户端,并与服务提供者所在的服务端建立连接。
        2. 将url转换为Invoker之前,将会进行配置的合并,合并覆盖顺序是:override > -D参数 >Consumer配置 > Provider配置,从这里可以知道消费者的配置优先级大于提供者的配置。
    5. 调用cluster.join方法传入directory进行集群容错能力包装,最终返回一个ClusterInvoker作为消费者Invoker,即MockClusterInvoker,这是一个包装类,内部包含真正的集群容错Invoker,默认是FailoverClusterInvoker。

到此我们可以知道上面的各种对象的关系(注意MockClusterInvoker上面还有一个MigrationInvoker没画出来):

在这里插入图片描述

到此接口级服务引入学习完毕,实际上Dubbo2就是采用的接口级别服务注册和引入。后面我们将继续学习应用级服务引入,实际上这才是Dubbo3升级的一个重点,非常值得学习!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/15669.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Linux基础1

Linux基础1 Linux基础1学习笔记 ‍ 声明&#xff01; ​​​学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章 笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他…

WSL2安装Ubuntu22.04并开启GPU进行ML学习教程

文章目录 一 启用 WSL2二、安装 Ubuntu三 安装 NVIDIA GPU 驱动和 CUDA 工具四、安装pytouch运行环境 这几天一直在研究下&#xff0c;怎么在笔记本win11电脑上安装linux系统用于机器学习、深度学习、大模型等相关的研究&#xff0c;前面试了VMWARE、HYPER-V等方式&#xff0c;…

用 Python 从零开始创建神经网络(七):梯度下降(Gradient Descent)/导数(Derivatives)

梯度下降&#xff08;Gradient Descent&#xff09;/导数&#xff08;Derivatives&#xff09; 引言1. 参数对输出的影响2. 斜率&#xff08;The Slope&#xff09;3. 数值导数&#xff08;The Numerical Derivative&#xff09;4. 解析导数&#xff08;The Analytical Derivat…

Cyberchef配合Wireshark提取并解析HTTP/TLS流量数据包中的文件

本文将介绍一种手动的轻量级的方式&#xff0c;还原HTTP/TLS协议中传输的文件&#xff0c;为流量数据包中的文件分析提供帮助。 如果捕获的数据包中存在非文本类文件&#xff0c;例如png,jpg等图片文件&#xff0c;或者word&#xff0c;Excel等office文件异或是其他类型的二进…

Java结合ElasticSearch根据查询关键字,高亮显示全文数据。

由于es高亮显示机制的问题。当全文内容过多&#xff0c;且搜索中标又少时&#xff0c;就会出现高亮结果无法覆盖全文。因此需要根据需求手动替换。 1.根据es的ik分词器获取搜索词的分词结果。 es部分&#xff1a; //中文分词解析 post /_analyze {"analyzer":"…

一觉睡醒,全世界计算机水平下降100倍,而我却精通C语言——scanf函数

大家好啊&#xff0c;我是小象٩(๑ω๑)۶ 我的博客&#xff1a;Xiao Fei Xiangζั͡ޓއއ 很高兴见到大家&#xff0c;希望能够和大家一起交流学习&#xff0c;共同进步。* 这一节我们主要来学习scanf的基本用法&#xff0c;了解scanf返回值&#xff0c;懂得scanf占位符和…

Kafka一些常用的命令行操作【包含主题命令、生产者和消费者命令】

文章目录 1、主题命令2、生产者命令行操作3、消费者命令行操作 1、主题命令 查看当前服务器中的所有 topic&#xff1a; kafka-topics.sh --bootstrap-server node01:9092 --list 创建topic&#xff1a; kafka-topics.sh --bootstrap-server node01:9092 --create --topic to…

基于物联网的温室大棚控制系统

本设计采用物联网方案&#xff0c;用STM32f103c8t6作为主控芯片&#xff0c;采用DHT11作为温湿度传感器&#xff0c;采集CO2使用JW01-CO2-V2.2传感器模块&#xff0c;并且通过BH1750传感器模块采集光照&#xff0c;通过土壤湿度传感器来获取大棚内部土壤湿度&#xff0c;ESP-01…

Ubuntu24安装配置NDK

1、下载NDK 下载压缩包&#xff0c;下载地址如下&#xff0c;建议下载LTS支持版本。 https://developer.android.google.cn/ndk/downloads?hlcs 2、解压缩 将NDK解压到指定文件夹。如&#xff1a;/opt 或者先解压&#xff0c;再移动到指定目录下。 3、配置环境变量 找到…

Python中的HTML

文章目录 一. HTML1. html的定义2. html的作用3. 基本结构4. 常用的html标签5. 列表标签① 无序列表② 有序列表 6. 表格标签7. 表单标签8. 表单提交① 表单属性设置② 表单元素属性设置 一. HTML 1. html的定义 HTML 的全称为&#xff1a;HyperText Mark-up Language, 指的是…

使用etl工具kettle的日常踩坑梳理之二、从Hadoop中导出数据

想操作MySQL等关系型数据库的可以访问我上一篇文章&#xff0c;本章主要介绍操作Hadoop等大数据组件。 根据2024年11月份测试了kettle工具在9.3及以上版本已经没有内置连接大数据(如Hadoop)组件了。 建议安装9.2及以下的&#xff0c;我这里送上8.3.0版本的请用百度网盘下载链…

NVR录像机汇聚管理EasyNVR多品牌NVR管理工具视频汇聚技术在智慧安防监控中的应用与优势

随着信息技术的快速发展和数字化时代的到来&#xff0c;安防监控领域也在不断进行技术创新和突破。NVR管理平台EasyNVR作为视频汇聚技术的领先者&#xff0c;凭借其强大的视频处理、汇聚与融合能力&#xff0c;展现出了在安防监控领域巨大的应用潜力和价值。本文将详细介绍Easy…

智慧安防丨以科技之力,筑起防范人贩的铜墙铁壁

近日&#xff0c;贵州省贵阳市中级人民法院对余华英拐卖儿童案做出了一审宣判&#xff0c;判处其死刑&#xff0c;剥夺政治权利终身&#xff0c;并处没收个人全部财产。这一判决不仅彰显了法律的威严&#xff0c;也再次唤起了社会对拐卖儿童犯罪的深切关注。 余华英自1993年至2…

游戏引擎学习第10天

视频参考:https://www.bilibili.com/video/BV1LyU3YpEam/ 介绍intel architecture reference manual 地址:https://www.intel.com/content/www/us/en/developer/articles/technical/intel-sdm.html RDTS&#xff08;读取时间戳计数器&#xff09;指令是 x86/x86_64 架构中的…

Asp.net Mvc 电脑销售系统

2 系统实现 2.1 系统框架 该服务平台采用三层架构&#xff0c;以ASP.NET MVC框架为基础 [6]。采用仓库设计的方法&#xff0c;布署灵便。.NET Framework是一个开源框架&#xff0c;适用混合开发。系统软件的集成开发工具是Visual Studio 2019。在数据库的开发中&#xff0c;利…

A3超级计算机虚拟机,为大型语言模型LLM和AIGC提供强大算力支持

热门大语言模型项目地址&#xff1a;www.suanjiayun.com/mirrorDetails?id66ac7d478099315577961758 近几个月来&#xff0c;我们目睹了大型语言模型&#xff08;LLMs&#xff09;和生成式人工智能强势闯入我们的视野&#xff0c;显然&#xff0c;这些模型在训练和运行时需要…

无人机飞手在保家卫国上重要性技术详解

无人机飞手在保家卫国方面发挥着越来越重要的作用&#xff0c;其重要性技术主要体现在以下几个方面&#xff1a; 一、无人机操作与维护技能 无人机飞手在入伍前通常已接受了系统的无人机操作培训&#xff0c;掌握了无人机的飞行原理、构造、维护保养以及多种飞行技巧。这种专…

数据结构--java对象的比较

目录 三种方法的比较 equals方法 Comparable.compareTo方法 执行比较 Comparator.compare方法 连接比较 Comparator和Comparable的区别 三种方法的比较 equals方法 将基本数据类型包装成对应的包装类&#xff08;如Integer、Double)进行比较 Comparable.compareTo方法 执…

yakit远程连接(引擎部署在vps上)

yakit的框架其实是类似于cs的主体是服务端&#xff0c;客户端只是一个简单的ui方便操作&#xff0c;同时他也提供了本地连接&#xff0c;平时使用还是本地连接的使用方式多&#xff0c;但在实际场景中服务端部署在vps上面会更加的方便&#xff0c;比如启动监听&#xff0c;使用…

实验二:Docker存储配置与管理

容器与非持久化数据 非持久化数据是不需要保存的那些数据&#xff0c;容器本地存储中的数据就属于这种类型。容器创建时会创 建非持久化存储&#xff0c;这是容器全部文件和文件系统保存的地方。 默认情况下&#xff0c;在容器内创建的所有文件都存储在可写容器层&#xff0c…