当前位置: 首页 > news >正文

聊城建网站服务黄浦品牌网站建设

聊城建网站服务,黄浦品牌网站建设,核工业西南建设集团有限公司网站,机关网站建设建议基于Dubbo 3.1#xff0c;详细介绍了Dubbo服务的发布与引用的源码。 此前我们学习了接口级的服务引入订阅的refreshInterfaceInvoker方法#xff0c;当时还有最为关键的notify服务通知更新的部分源码没有学习#xff0c;本次我们来学习notify通知本地服务更新的源码。 Dubb… 基于Dubbo 3.1详细介绍了Dubbo服务的发布与引用的源码。 此前我们学习了接口级的服务引入订阅的refreshInterfaceInvoker方法当时还有最为关键的notify服务通知更新的部分源码没有学习本次我们来学习notify通知本地服务更新的源码。 Dubbo 3.x服务引用源码 Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口Dubbo 3.x源码(18)—Dubbo服务引用源码(1)Dubbo 3.x源码(19)—Dubbo服务引用源码(2)Dubbo 3.x源码(20)—Dubbo服务引用源码(3)Dubbo 3.x源码(21)—Dubbo服务引用源码(4)Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvokerDubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新 Dubbo 3.x服务发布源码 Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6) 1 notify服务通知更新 当第一次订阅服务节点或者服务节点目录的子节点更新时例如新的producer上下线将会调用notify服务通知更新的方法会更新本地缓存的数据。 notify方法的入口是FailbackRegistry的notify方法。 /*** FailbackRegistry的方法* p* 服务通知** param url consumer side url* param listener listener* param urls provider latest urls*/ Override protected void notify(URL url, NotifyListener listener, ListURL urls) {if (url null) {throw new IllegalArgumentException(notify url null);}if (listener null) {throw new IllegalArgumentException(notify listener null);}try {/** 调用doNotify方法更新*/doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error(Failed to notify addresses for subscribe url , cause: t.getMessage(), t);} } /*** FailbackRegistry的方法* p* 服务通知*/ protected void doNotify(URL url, NotifyListener listener, ListURL urls) {//调用父类AbstractRegistry的方法super.notify(url, listener, urls); }2 AbstractRegistry#notify通知更新 该方法涉及两个重要知识点 一是对于拉取到的服务节点url按照类别providers、configurators 、routers进行分类然后遍历每个类别依次调用RegistryDirectory#notify方法触发监听回调进行服务数据的更新。二是RegistryDirectory#notify方法通知执行完毕之后调用saveProperties方法更新缓存文件。当注册中心由于网络抖动而订阅失败时至少可以返回现有的缓存的URL。 /*** AbstractRegistry的方法* p* 通知更新** param url consumer side url* param listener listener* param urls provider latest urls*/ protected void notify(URL url, NotifyListener listener, ListURL urls) {if (url null) {throw new IllegalArgumentException(notify url null);}if (listener null) {throw new IllegalArgumentException(notify listener null);}if ((CollectionUtils.isEmpty(urls)) !ANY_VALUE.equals(url.getServiceInterface())) {// 1-4 Empty address.logger.warn(1-4, , , Ignore empty notify urls for subscribe url url);return;}if (logger.isInfoEnabled()) {logger.info(Notify urls for subscribe url url , url size: urls.size());}//根据节点类别对url进行分类MapString, ListURL result new HashMap();//遍历url进行分类for (URL u : urls) {//服务消费者和服务提供者的服务接口名匹配if (UrlUtils.isMatch(url, u)) {//获取url的category类别默认providers同时服务提供者urlServiceAddressURL固定返回providersString category u.getCategory(DEFAULT_CATEGORY);//将url加入到对应类别的categoryList中ListURL categoryList result.computeIfAbsent(category, k - new ArrayList());categoryList.add(u);}}//result一般有三个元素即三个类别providers、configurators 、routersif (result.size() 0) {return;}MapString, ListURL categoryNotified notified.computeIfAbsent(url, u - new ConcurrentHashMap());//遍历每一个类别for (Map.EntryString, ListURL entry : result.entrySet()) {//获取类别String category entry.getKey();ListURL categoryList entry.getValue();//存入categoryNotifiedcategoryNotified.put(category, categoryList);//执行leitener的notify方法进行通知listener可以是RegistryDirectory/** RegistryDirectory#notify通知*/listener.notify(categoryList);/** 本地缓存*/// We will update our cache file after each notification.// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.//将在每次通知后更新缓存文件。当注册中心由于网络抖动而订阅失败时至少可以返回现有的缓存的URL。//本地缓存默认支持if (localCacheEnabled) {saveProperties(url);}} }3 RegistryDirectory#notify更新本地内存信息 该方法根据url更新RegistryDirectory对象的内存信息将可能会更新RegistryDirectory 内部的configurators配置信息集合routerChain路由链以及urlInvokerMap缓存。 在最后会专门调用refreshOverrideAndInvoker方法将服务提供者url转换为invoker进行服务提供者的更新。 /*** RegistryDirectory的方法* * 服务变更通知* param urls 服务提供者注册信息列表*/ Override public synchronized void notify(ListURL urls) {if (isDestroyed()) {return;}MapString, ListURL categoryUrls urls.stream().filter(Objects::nonNull)//类别合法性过滤.filter(this::isValidCategory).filter(this::isNotCompatibleFor26x)//根据类别分组.collect(Collectors.groupingBy(this::judgeCategory));//获取配置信息url集合可以为空ListURL configuratorURLs categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());//将配置信息url转换为Configurator集合并赋值给configurators属性可以为空this.configurators Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);//获取路由信息url集合可以为空ListURL routerURLs categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());//将配置信息url转换为Router集合并加入routerChain路由链可以为空toRouters(routerURLs).ifPresent(this::addRouters);// providers//获取服务提供者url集合可以为空ListURL providerURLs categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());// 3.x added for extend URL address//添加扩展URL地址 3.x的特性ExtensionLoaderAddressListener addressListenerExtensionLoader getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);//获取AddressListener默认空集合ListAddressListener supportedListeners addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners ! null !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) {providerURLs addressListener.notify(providerURLs, getConsumerUrl(), this);}}/** 将服务提供者url转换为invoker进行服务提供者的更新*/refreshOverrideAndInvoker(providerURLs); }3.1 refreshOverrideAndInvoker刷新invoker 该方法将服务提供者url转换为invoker进行服务提供者的更新这在consumer对producer的信息更新部分是非常重要的一个方法。 url转换规则为 如果URL已转换为invoker则不再重新引用它并直接从缓存获取它请注意URL中的任何参数更改都将被重新引用。如果传入invoker列表不为空则表示它是最新的invoker列表。如果传入invokerUrl的列表为空则意味着该规则只是一个覆盖规则或路由规则需要重新对比以决定是否重新引用。 /*** RegistryDirectory的方法* p* 将服务提供者url转换为invoker进行服务提供者的更新** param urls 服务提供者url*/ private synchronized void refreshOverrideAndInvoker(ListURL urls) {// mock zookeeper://xxx?mockreturn nullrefreshInvoker(urls); }/*** 将invokerURL列表转换为Invoker Map** param invokerUrls this parameter cant be null*/ private void refreshInvoker(ListURL invokerUrls) {Assert.notNull(invokerUrls, invokerUrls should not be null);//如果只有一个协议为empty的url表示最新注册中心没有任何该服务提供者url信息if (invokerUrls.size() 1 invokerUrls.get(0) ! null EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {//设置为禁止访问this.forbidden true; // Forbid to access//设置routerChain的服务提供者invoker集合为一个空集合routerChain.setInvokers(BitList.emptyList());//关闭urlInvokerMap中的所有服务提供者invokerdestroyAllInvokers(); // Close all invokers}//表明可能存在服务提供者urlelse {//允许访问this.forbidden false; // Allow to accessif (invokerUrls Collections.URLemptyList()) {invokerUrls new ArrayList();}// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().//使用本地引用来避免NPE。cachedInvokerUrls将被destroyAllInvokers()方法设置为空。SetURL localCachedInvokerUrls this.cachedInvokerUrls;//空的服务提供者url集合if (invokerUrls.isEmpty() localCachedInvokerUrls ! null) {// 1-4 Empty address.logger.warn(1-4, configuration , ,Service serviceKey received empty address list with no EMPTY protocol set, trigger empty protection.);invokerUrls.addAll(localCachedInvokerUrls);} else {//缓存的invoker url便于比较localCachedInvokerUrls new HashSet();localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparisonthis.cachedInvokerUrls localCachedInvokerUrls;}if (invokerUrls.isEmpty()) {return;}// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().//使用本地引用来避免NPE。urlInvokerMap将在destroyAllInvokers()方法设置为空。MapURL, InvokerT localUrlInvokerMap this.urlInvokerMap;// cant use local reference as oldUrlInvokerMaps mappings might be removed directly at toInvokers().//不能使用本地引用因为oldUrlInvokerMap的映射可能会直接在toInvokers()中删除。MapURL, InvokerT oldUrlInvokerMap null;if (localUrlInvokerMap ! null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap new LinkedHashMap(Math.round(1 localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}/** 将URL转换为Invoker*/MapURL, InvokerT newUrlInvokerMap toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map/** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.* eg: consumer protocol dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {// 3-1 - Failed to convert the URL address into Invokers.logger.error(3-1, inconsistency between the client protocol and the protocol of the server,, urls to invokers error,new IllegalStateException(urls to invokers error. invokerUrls.size : invokerUrls.size() , invoker.size :0. urls : invokerUrls.toString()));return;}ListInvokerT newInvokers Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));this.setInvokers(multiGroup ? new BitList(toMergeInvokerList(newInvokers)) : new BitList(newInvokers));// pre-route and build cache//invoker集合存入routerChain的invokers属性routerChain.setInvokers(this.getInvokers());//设置urlInvokerMap为新的urlInvokerMapthis.urlInvokerMap newUrlInvokerMap;try {//销毁无用 InvokerdestroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn(destroyUnusedInvokers error. , e);}// 通知invoker刷新this.invokersChanged();} }3.2 toInvokers将URL转换为Invoker 将url转换为Invoker如果url已被引用将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。 该方法的大概逻辑为 获取获取消费者需要查询过滤的协议遍历全部最新服务提供者url依次进行如下操作调用checkProtocolValid方法校验当前提供者url协议是否支持当前服务消费者调用如果不支持则跳过该提供者。服务消费者可以手动指定消费某些协议的服务提供者其他的服务提供者将被丢弃。调用mergeUrl方法合并服务提供者url的配置合并覆盖顺序是override -D参数 Consumer配置 Provider配置从这里可以知道消费者的配置优先级大于提供者的配置。从原来的缓存中获取该url对应的invoker 如果已经存在该缓存那么直接将缓存的invoker加入到新的invoker map缓存中不再从新引用。如果缓存没有该url对应的invoker那么将会重新引用该invoker并将新引入的invoker加入到新的invoker map缓存中。 返回最新的url到invoker的缓存map。 /*** RegistryDirectory的的方法** 将url转换为Invoker如果url已被引用将不会重新引用。将放入newUrlInvokeMap的项将从oldUrlInvokerMap中删除。** param oldUrlInvokerMap 此前的url到invoker的映射* param urls 最新服务提供者url集合* return invokers 最新的url到invoker的映射*/ private MapURL, InvokerT toInvokers(MapURL, InvokerT oldUrlInvokerMap, ListURL urls) {//新的映射mapMapURL, InvokerT newUrlInvokerMap new ConcurrentHashMap(urls null ? 1 : (int) (urls.size() / 0.75f 1));if (urls null || urls.isEmpty()) {return newUrlInvokerMap;}//获取消费者需要查询过滤的协议String queryProtocols this.queryMap.get(PROTOCOL_KEY);//遍历最新服务提供者url集合for (URL providerUrl : urls) {//校验当前提供者url协议是否支持当前服务消费者调用如果不支持则跳过该提供者//服务消费者可以手动指定消费某些协议的服务提供者其他的服务提供者将被丢弃if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}//合并服务提供者url的配置合并覆盖顺序是override -D参数 Consumer配置 Provider配置//从这里可以知道消费者的配置优先级大于提供者的配置URL url mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer again//从原来的缓存中获取该url对应的invokerInvokerT invoker oldUrlInvokerMap null ? null : oldUrlInvokerMap.remove(url);//如果缓存没有该url对应的invoker那么将会重新引用该invokerif (invoker null) { // Not in the cache, refer againtry {boolean enabled true;if (url.hasParameter(DISABLED_KEY)) {enabled !url.getParameter(DISABLED_KEY, false);} else {enabled url.getParameter(ENABLED_KEY, true);}//如果启用服务if (enabled) {//再次通过Protocol$Adaptive的refer方法引用该服务提供者//在最开始我们就是通过refer方法引用服务的在再次见到这个方法只不过这里的url已经变成了某个服务提供者的url了invoker protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException t.getMessage().contains(serialization optimizer)) {// 4-2 - serialization optimizer class initialization failed.logger.error(4-2, typo in optimizer class, ,Failed to refer invoker for interface: serviceType ,url:( url ) t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error(4-3, , ,Failed to refer invoker for interface: serviceType ,url:( url ) t.getMessage(), t);}}//加入到新的invoker map缓存中if (invoker ! null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {//如果已经存在该缓存那么直接将缓存的invoker加入到新的invoker map缓存中不再从新引用newUrlInvokerMap.put(url, invoker);}}//返回新的invoker mapreturn newUrlInvokerMap; }在上面的步骤中如果是首次启动消费者将会统一走Protocol$Adaptive的refer方法引用该服务提供者的逻辑。还记得在最开始讲consumer服务引入的时候吗那时候我们就是通过这个refer方法引用服务的现在再次见到这个方法只不过此前的url则是注册中心协议url对应着RegistryProtocol而这里的url已经变成了某个服务提供者的url了对应着具体的协议实现例如DubboProtocol、RestProtocol。 我们此前就讲过了Protocol$Adaptive的refer方法实际上返回的是被wrapper包装的Protocol这里我们直接看最底层的Protocol的refer方法以默认协议dubbo协议的Protocol实现DubboProtocol为例子 4 DubboProtocol#refer dubbo协议服务引入 该方法执行基于dubbo序列化协议的服务引入最终会创建一个DubboInvoker内部包含一个nettyClient已经与对应的服务提供者的nettyServer建立了连接可用于发起rpc远程调用请求。 /*** DubboProtocol的方法** param type 服务类型* param url 远程服务提供者url* return* param T* throws RpcException*/ Override public T InvokerT refer(ClassT type, URL url) throws RpcException {//销毁检测checkDestroyed();//协议绑定引用return protocolBindingRefer(type, url); }Override public T InvokerT protocolBindingRefer(ClassT serviceType, URL url) throws RpcException {//销毁检测checkDestroyed();//序列化优化optimizeSerialization(url);// create rpc invoker.//创建一个DubboInvoker可用于发起rpc远程调用DubboInvokerT invoker new DubboInvokerT(serviceType, url, getClients(url), invokers);//加入协议缓存invokersinvokers.add(invoker);return invoker; }4.1 getClients获取服务客户端 该方法获取服务提供者网络调用客户端。这里会判断是否使用共享连接因为一个服务提供者根提供了很多的服务接口这个的是否共享连接实际上就是指的消费者引入时候是这些服务接口是否共用一些客户端连接默认一个或者说不同的服务接口使用独立的客户端连接默认一个服务一个连接。默认是共享连接。 /*** DubboProtocol的方法* 获取服务客户端** param url 服务提供者url* return ExchangeClient数组*/ private ExchangeClient[] getClients(URL url) {//获取配置的连接数默认为0int connections url.getParameter(CONNECTIONS_KEY, 0);// whether to share connection// if not configured, connection is shared, otherwise, one connection for one service//是否共享连接如果没有配置connections那么连接是共享的否则一个服务连接一个服务if (connections 0) {/** The xml configuration should have a higher priority than properties.* 共享连接配置xml配置的优先级应该高于属性*/String shareConnectionsStr StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS): url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections Integer.parseInt(shareConnectionsStr);//获取共享客户端ListReferenceCountExchangeClient shareClients getSharedClient(url, connections);//设置到ExchangeClient数组中ExchangeClient[] clients new ExchangeClient[connections];Arrays.setAll(clients, shareClients::get);return clients;}//非共享连接表示当前服务接口使用单独的连接ExchangeClient[] clients new ExchangeClient[connections];for (int i 0; i clients.length; i) {//初始化新的客户端clients[i] initClient(url);}return clients; }4.2 getSharedClient获取共享客户端连接 如果是共享连接配置那么调用getSharedClient方法获取共享客户端连接默认连接数为1。该方法的大概步骤为 首先获取服务提供者ip:port 作为共享连接的key即共享连接情况下同一个服务提供者实例下的所有服务接口共享某些连接。从缓存referenceClientMap获取key对应的共享客户端连接。如果存在缓存并且客户端连接全部可用那么增加连接技术然后返回即可。否则只要有一个客户端不可用就需要用可用的客户端替换不可用的客户端。如果此前没有该key的客户端连接缓存或者连接不是全部可用都要走下面的步骤尝试新创建连接。加synchronized锁在锁代码中再次双重检测注意这里还有线程等待唤醒机制。最后判断如果客户端连接为空那么调用buildReferenceCountExchangeClientList方法构建指定数量的客户端连接。如果连接不为空那么遍历连接判断如果该连接不可用那么新创建一个连接补充进来。最后的处理仍需要加synchronized锁判断如果最终没建立连接那么移除无效缓存否则将最终的客户端连接存入缓存最后唤醒其他等待的线程。 该方法的核心知识点有两个一个是buildReferenceCountExchangeClientList方法构建指定数量的客户端连接另一个就是方法中的synchronized锁以及等待唤醒机制。 为什么需要等待唤醒呢因为这是共享客户端那么可能有多个线程都在初始化同一个ip:port的多个客户端为了避免冲突需要加锁。 /*** DubboProtocol的方法* p* 获取共享客户端连接** param url 服务提供者url* param connectNum 共享连接数量默认1*/ SuppressWarnings(unchecked) private ListReferenceCountExchangeClient getSharedClient(URL url, int connectNum) {//获取 服务提供者ip:port 作为共享连接的keyString key url.getAddress();//从缓存获取key对应的共享客户端连接Object clients referenceClientMap.get(key);if (clients instanceof List) {//转换为ReferenceCountExchangeClient集合带有引用计数的功能ListReferenceCountExchangeClient typedClients (ListReferenceCountExchangeClient) clients;//检测客户端连接是否全部可用//只要有一个客户端不可用就需要用可用的客户端替换不可用的客户端。if (checkClientCanUse(typedClients)) {//如果可用//增加连接的引用计数如果我们创建新的调用者共享相同的连接连接将关闭没有任何引用batchClientRefIncr(typedClients);return typedClients;}}//如果此前没有该key的连接缓存那么新创建ListReferenceCountExchangeClient typedClients null;synchronized (referenceClientMap) {//死循环for (; ; ) {// guarantee just one thread in loading condition. And Other is waiting It had finished.//双重检测锁clients referenceClientMap.get(key);if (clients instanceof List) {typedClients (ListReferenceCountExchangeClient) clients;if (checkClientCanUse(typedClients)) {batchClientRefIncr(typedClients);return typedClients;} else {//如果共享连接不是全部可用那么缓存值先设置为为一个object对象跳出循环referenceClientMap.put(key, PENDING_OBJECT);break;}}//如果客户端连接PENDING_OBJECT那么表示有其他线程正在初始化当前客户端连接那么当前线程等待直到被通知else if (clients PENDING_OBJECT) {try {referenceClientMap.wait();} catch (InterruptedException ignored) {}}//如果没有共享连接那么缓存值先设置为为一个object对象跳出循环else {referenceClientMap.put(key, PENDING_OBJECT);break;}}}try {//连接数量必须大于等于1connectNum Math.max(connectNum, 1);// If the clients is empty, then the first initialization is//如果客户端连接为空if (CollectionUtils.isEmpty(typedClients)) {/** 构建客户端连接*/typedClients buildReferenceCountExchangeClientList(url, connectNum);}//如果连接不为空else {//遍历连接for (int i 0; i typedClients.size(); i) {//如果该连接不可用那么新创建一个连接补充进来ReferenceCountExchangeClient referenceCountExchangeClient typedClients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient null || referenceCountExchangeClient.isClosed()) {typedClients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}} finally {synchronized (referenceClientMap) {//如果最终没建立连接那么移除无效缓存if (typedClients null) {referenceClientMap.remove(key);} else {//将最终的客户端连接存入缓存referenceClientMap.put(key, typedClients);}//唤醒其他线程referenceClientMap.notifyAll();}}return typedClients; }4.3 buildReferenceCountExchangeClientList构建客户端连接 该方法构建指定数量的引用计数交换器客户端内部循环调用buildReferenceCountExchangeClient方法构建耽单个客户端连接内部调用initClient方法初始化交换器客户端启动一个nettyClient并与服务端建立了连接。 /*** DubboProtocol的方法* 构建指定数量的引用计数交换器客户端** param url 服务提供者url* param connectNum 客户端数量* return*/ private ListReferenceCountExchangeClient buildReferenceCountExchangeClientList(URL url, int connectNum) {ListReferenceCountExchangeClient clients new ArrayList();//循环调用buildReferenceCountExchangeClient方法for (int i 0; i connectNum; i) {clients.add(buildReferenceCountExchangeClient(url));}return clients; }/*** 构建一个引用计数交换器客户端** param url* return*/ private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {//初始化交换器客户端启动一个nettyClient并与服务端建立了连接ExchangeClient exchangeClient initClient(url);//创建ReferenceCountExchangeClientReferenceCountExchangeClient client new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);// read configs//获取服务器关闭等待超时时间默认10000msint shutdownTimeout ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());client.setShutdownWaitTime(shutdownTimeout);return client; }4.4 initClient建立客户端连接 该方法创建客户端连接大概步骤为 首先获取客户端底层通信框架类型应该和服务端的底层通信框统一默认netty。用ServiceConfigURL替换InstanceAddressURL协议为dubbo协议。获取lazy参数判断连接是否懒加载默认false即饿加载。如果懒加载那么只有在第一次调用服务时才会创建与服务端的连接否则立即调用Exchangers.connect(url, requestHandler)方法与服务端建立底层通信客户端连接。 默认情况下客户端为饿加载客户端与服务端的连接在消费者客户端启动引用服务的时候就已经建立了即服务提供者url转换为invoker的时候就已经建立了连接。 /*** DubboProtocol的方法* 创建一个新的连接** param url 服务提供者url*/ private ExchangeClient initClient(URL url) {/** Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,* which means params are shared among different services. Since client is shared among services this is currently not a problem.*///获取客户端底层通信框架类型应该和服务端的底层通信框统一默认nettyString str url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));// BIO is not allowed since it has severe performance issue.//不允许使用BIO因为它有严重的性能问题目前都是使用netty4if (StringUtils.isNotEmpty(str) !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException(Unsupported client type: str , supported client type is StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), ));}try {// Replace InstanceAddressURL with ServiceConfigURL.//用ServiceConfigURL替换InstanceAddressURL协议为dubbo协议url new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());url url.addParameter(CODEC_KEY, DubboCodec.NAME);// enable heartbeat by default//默认启用心跳url url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));//连接是否懒加载默认false即饿加载return url.getParameter(LAZY_CONNECT_KEY, false)//如果懒加载那么只有在第一次调用服务时才会创建与服务端的连接? new LazyConnectExchangeClient(url, requestHandler)//饿加载与服务端建立底层通信客户端连接: Exchangers.connect(url, requestHandler);} catch (RemotingException e) {throw new RpcException(Fail to create remoting client for service( url ): e.getMessage(), e);} }4.5 Exchangers#connect建立连接 该方法和我们此前学习的服务提供者的Exchangers#bind方法类型只不过bind方法创建服务端该方法创建客户端。 该方法内部基于Dubbo SPI获取Exchanger默认HeaderExchanger然后调用HeaderExchanger#connect方法。 /*** Exchangers的方法** 客户端建立与服务端的连接** param url 服务提供者url* param handler 请求处理器* return 客户端连接*/ public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url null) {throw new IllegalArgumentException(url null);}if (handler null) {throw new IllegalArgumentException(handler null);}//基于Dubbo SPI获取Exchanger默认HeaderExchanger然后调用HeaderExchanger#connect方法return getExchanger(url).connect(url, handler); }HeaderExchanger#connect方法中首先对handler进行包装DecodeHandler - HeaderExchangeHandler - requestHandler。 DecodeHandler用于负责内部的dubbo协议的请求解码。HeaderExchangeHandler用于完成请求响应的映射。requestHandler用于nettyHandler真正处理请求。 随后调用Transporters#connect方法启动底层远程网络通信客户端返回Client。Transporter是Dubbo对网络传输层的抽象接口Exchanger依赖于Transporter。 最后基于Client构建HeaderExchangeClient返回。 Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {//包装handlerDecodeHandler - HeaderExchangeHandler - handler//调用Transporters#connect方法启动底层远程网络通信客户端返回Client//基于Client构建HeaderExchangeClient返回return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }Transporters#connect方法将会在handler的最外层继续包装一层ChannelHandlerDispatcher它所有的 ChannelHandler 接口实现都会调用其中每个 ChannelHandler 元素的相应方法。随后基于Dubbo SPI机制获取Transporter的实现并调用connect方法完成绑定目前仅NettyTransporter基于netty4。 public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url null) {throw new IllegalArgumentException(url null);}//继续包装一层ChannelHandlerDispatcherChannelHandler handler;if (handlers null || handlers.length 0) {handler new ChannelHandlerAdapter();} else if (handlers.length 1) {handler handlers[0];} else {handler new ChannelHandlerDispatcher(handlers);}//基于Dubbo SPI机制获取Transporter的实现并调用connect方法完成绑定return getTransporter(url).connect(url, handler); }4.6 NettyTransporter#connect创建NettyClient 该方法很简单就是根据url和handler创建一个NettyClient实例在NettyClient的构造器中会调用doOpen()开启客户端创建Bootstrap设置EventLoopGroup编配ChannelHandlerPipeline随后调用connect方法连接服务提供者所在服务端。 Override public Client connect(URL url, ChannelHandler handler) throws RemotingException {//基于url和handler创建NettyClientreturn new NettyClient(url, handler); }NettyClient的构造器如下将会调用父类构造器启动客户端。 public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler-HeartbeatHandler-handler//可通过CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定义客户端线程池的名称和类型//继续包装handler MultiMessageHandler-HeartbeatHandler-handlersuper(url, wrapChannelHandler(url, handler)); }AbstractClient的构造器如下将会获取绑定的ip和端口以及其他参数然后调用doOpen方法真正的开启netty客户端最后调用connect方法连接服务提供者所在服务端。 public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);// set default needReconnect true when channel is not connected//当通道未连接时设置默认needReconnect为trueneedReconnect url.getParameter(Constants.SEND_RECONNECT_KEY, true);//初始化执行器消费者的执行程序是全局共享的提供者ip不需要是线程名的一部分。initExecutor(url);try {/** 创建netty客户端*/doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,Failed to start getClass().getSimpleName() NetUtils.getLocalAddress() connect to the server getRemoteAddress() , cause: t.getMessage(), t);}try {// connect./** 连接服务提供者所在服务端*/connect();if (logger.isInfoEnabled()) {logger.info(Start getClass().getSimpleName() NetUtils.getLocalAddress() connect to the server getRemoteAddress());}} catch (RemotingException t) {// If lazy connect client fails to establish a connection, the client instance will still be created,// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exceptionif (url.getParameter(LAZY_CONNECT_KEY, false)) {logger.warn(Failed to start getClass().getSimpleName() NetUtils.getLocalAddress() connect to the server getRemoteAddress() (the connection request is initiated by lazy connect client, ignore and retry later!), cause: t.getMessage(), t);return;}if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;} else {logger.warn(Failed to start getClass().getSimpleName() NetUtils.getLocalAddress() connect to the server getRemoteAddress() (check false, ignore and retry later!), cause: t.getMessage(), t);}} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,Failed to start getClass().getSimpleName() NetUtils.getLocalAddress() connect to the server getRemoteAddress() , cause: t.getMessage(), t);} }4.7 doOpen初始化NettyClient 该方法用于初始化并启动netty客户端是非常标准的netty客户端启动代码如果你们使用过Netty看过Netty源码一定就会感到非常熟悉。 创建Bootstrap设置eventGroup编配ChannelHandler。至此成功初始化了Bootstrap但是并没有连接服务端。 /*** NettyClient的方法** 初始化 bootstrap*/ Override protected void doOpen() throws Throwable {//创建NettyClientHandlerfinal NettyClientHandler nettyClientHandler createNettyClientHandler();//创建Bootstrap说明这是一个netty客户端bootstrap new Bootstrap();//初始化NettyClientinitBootstrap(nettyClientHandler); }protected NettyClientHandler createNettyClientHandler() {//创建NettyClientHandler当前NettyClient对象本身也是一个ChannelHandler实例其received方法委托给创建实例时传递的内部的handler处理return new NettyClientHandler(getUrl(), this); }protected void initBootstrap(NettyClientHandler nettyClientHandler) {//配置线程组bootstrap.group(EVENT_LOOP_GROUP.get())//设置Socket 参数.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())//IO模型.channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));//设置处理器bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast(negotiation, new SslClientTlsHandler(getUrl()));}NettyCodecAdapter adapter new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);//自定义客户端消息的业务处理逻辑Handlerch.pipeline()//.addLast(logging,new LoggingHandler(LogLevel.INFO))//for debug//解码.addLast(decoder, adapter.getDecoder())//编码.addLast(encoder, adapter.getEncoder())//心跳检测.addLast(client-idle-handler, new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前创建的nettyClientHandler.addLast(handler, nettyClientHandler);String socksProxyHost ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);if(socksProxyHost ! null !isFilteredAddress(getUrl().getHost())) {int socksProxyPort Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}}); }4.8 connect连接服务端 在初始化Bootstrap之后将调用connect方法真正的连接服务提供者所在的服务端内部调用doConnect方法执行连接该方法由子类实现。 /*** AbstractClient的方法* p* 连接服务提供者所在服务端*/ protected void connect() throws RemotingException {//加锁connectLock.lock();try {//如果已连接则返回if (isConnected()) {return;}//如果已关闭则返回if (isClosed() || isClosing()) {logger.warn(No need to connect to server getRemoteAddress() from getClass().getSimpleName() NetUtils.getLocalHost() using dubbo version Version.getVersion() , cause: client status is closed or closing.);return;}/** 执行连接*/doConnect();if (!isConnected()) {throw new RemotingException(this, Failed to connect to server getRemoteAddress() from getClass().getSimpleName() NetUtils.getLocalHost() using dubbo version Version.getVersion() , cause: Connect wait timeout: getConnectTimeout() ms.);} else {if (logger.isInfoEnabled()) {logger.info(Successfully connect to server getRemoteAddress() from getClass().getSimpleName() NetUtils.getLocalHost() using dubbo version Version.getVersion() , channel is this.getChannel());}}} catch (RemotingException e) {throw e;} catch (Throwable e) {throw new RemotingException(this, Failed to connect to server getRemoteAddress() from getClass().getSimpleName() NetUtils.getLocalHost() using dubbo version Version.getVersion() , cause: e.getMessage(), e);} finally {connectLock.unlock();} }NettyClient的doConnect方法如下主要逻辑就是调用bootstrap.connect方法连接服务端 /*** NettyClient的方法* 连接服务端*/ Override protected void doConnect() throws Throwable {long start System.currentTimeMillis();//通过bootstrap连接服务端ChannelFuture future bootstrap.connect(getConnectAddress());try {//等待连接超时事件boolean ret future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);//如果连接成功if (ret future.isSuccess()) {//获取通道Channel newChannel future.channel();try {// Close old channel// copy reference//关闭旧的ChannelChannel oldChannel NettyClient.this.channel;if (oldChannel ! null) {try {if (logger.isInfoEnabled()) {logger.info(Close old netty channel oldChannel on create new netty channel newChannel);}oldChannel.close();} finally {NettyChannel.removeChannelIfDisconnected(oldChannel);}}} finally {if (NettyClient.this.isClosed()) {try {if (logger.isInfoEnabled()) {logger.info(Close new netty channel newChannel , because the client closed.);}newChannel.close();} finally {NettyClient.this.channel null;NettyChannel.removeChannelIfDisconnected(newChannel);}} else {NettyClient.this.channel newChannel;}}} else if (future.cause() ! null) {Throwable cause future.cause();// 6-1 Failed to connect to provider server by other reason.RemotingException remotingException new RemotingException(this, client(url: getUrl() ) failed to connect to server getRemoteAddress() , error message is: cause.getMessage(), cause);logger.error(6-1, network disconnected, ,Failed to connect to provider server by other reason., cause);throw remotingException;} else {// 6-2 Client-side timeoutRemotingException remotingException new RemotingException(this, client(url: getUrl() ) failed to connect to server getRemoteAddress() client-side timeout getConnectTimeout() ms (elapsed: (System.currentTimeMillis() - start) ms) from netty client NetUtils.getLocalHost() using dubbo version Version.getVersion());logger.error(6-2, provider crash, ,Client-side timeout., remotingException);throw remotingException;}} finally {// just add new valid channel to NettyChannels cacheif (!isConnected()) {//future.cancel(true);}} }5 saveProperties更新本地文件信息 在每次通知内存数据更新之后更新缓存文件。当注册中心由于网络抖动而订阅失败时至少可以返回现有的缓存的URL。 /*** AbstractRegistry的方法** param url 服务消费者url*/ private void saveProperties(URL url) {//服务缓存文件路径为 {user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cacheif (file null) {return;}try {//需要存储的url字符串StringBuilder buf new StringBuilder();//获取该url的不同类别节点到对应url列表的mapMapString, ListURL categoryNotified notified.get(url);//遍历所有的节点urlif (categoryNotified ! null) {for (ListURL us : categoryNotified.values()) {for (URL u : us) {if (buf.length() 0) {//追加空格buf.append(URL_SEPARATOR);}//追加url字符串buf.append(u.toFullString());}}}//消费者url key以及对应的节点url字符串存入propertiesproperties.setProperty(url.getServiceKey(), buf.toString());//版本自增long version lastCacheChanged.incrementAndGet();//保存properties到本地文件if (syncSaveFile) {doSaveProperties(version);} else {registryCacheExecutor.schedule(() - doSaveProperties(version), DEFAULT_INTERVAL_SAVE_PROPERTIES, TimeUnit.MILLISECONDS);}} catch (Throwable t) {logger.warn(t.getMessage(), t);} }本地缓存文件路径为{user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cache里面缓存的内容如下每一个服务接口占据一行它的所有url字符串都追加在后面通过空格分隔。 6 总结 本次我们学习了接口级别服务发现订阅refreshInterfaceInvoker方法的具体实现大概步骤为 第一次调用refreshInterfaceInvoker方法的时候由于MigrationInvoker内部的真实消费者Invoker为null那么需要创建一个消费者Invoker。首先创建动态注册心中目录DynamicDirectory随后调用doCreateInvoker方法创建服务消费者Invoker。 首先根据消费者信息转换为消费者注册信息url内部包括消费者ip、指定引用的protocol默认consumer协议、指定引用的服务接口、指定引用的方法以及其他消费者信息。调用registry.register方法将消费者注册信息url注册到注册中心。调用directory.buildRouterChain方法构建服务调用路由链RouterChain赋给directory的routerChain属性。调用directory.subscribe方法进行服务发现、引入并订阅服务。 directory本身是一个监听器directory将会订阅zookeeper对应的服务接口节点下的dubbo/[service name]/providers服务提供者目录以及dubbo/[service name]/configurators即配置目录以及dubbo/[service name]/routers即服务路由目录。依靠着zookeeper的watch监听回调机制当这些节点下的子节点发生变化时会触发回调通知RegistryDirectory执行notify方法进而完成本地服务列表的动态更新功能。实际上服务提供者也会订阅只不过只会订阅configurators节点。在执行订阅的时候将会进行一次providers,configurators,routers节点目录下字节点的获取这样就获取到了当前的服务提供者url、配置信息url、服务路由url。在subscribe方法的最后也是最关键的一步主动调用notify方法通知数据变更。这里实际上会动态更新本地内存和文件中的服务提供者缓存可能会更新RegistryDirectory 内部的configurators配置信息集合routerChain路由链以及urlInvokerMap缓存这里面存放着服务提供者url到对应的Invoker的映射。 如果没有在本地缓存中找到某个服务提供者url的缓存那么会将url转换为对应协议的Invoker默认DubboInvokerDubboInvoker的内部还会创建NettyClient客户端并与服务提供者所在的服务端建立连接。将url转换为Invoker之前将会进行配置的合并合并覆盖顺序是override -D参数 Consumer配置 Provider配置从这里可以知道消费者的配置优先级大于提供者的配置。 调用cluster.join方法传入directory进行集群容错能力包装最终返回一个ClusterInvoker作为消费者Invoker即MockClusterInvoker这是一个包装类内部包含真正的集群容错Invoker默认是FailoverClusterInvoker。 到此我们可以知道上面的各种对象的关系注意MockClusterInvoker上面还有一个MigrationInvoker没画出来 到此接口级服务引入学习完毕实际上Dubbo2就是采用的接口级别服务注册和引入。后面我们将继续学习应用级服务引入实际上这才是Dubbo3升级的一个重点非常值得学习
http://www.hkea.cn/news/14431111/

相关文章:

  • 一个网站3个相似域名查关键词排名
  • 免费字体下载网站内蒙古网络公司有哪些
  • 济南制作网站的公司吗中国商标免费查询入口
  • 网站建设公司推荐q479185700顶上四川省工程建设信息官方网站
  • 网站开发与软件开发的区别免费静态网页模板下载
  • 网站推广工作淮安市住房和城乡建设局网站首页
  • 企业网站建设搜集资料天眼查企业查询下载
  • 网站建设与网页设计案例教程pdf下载域名信息备案管理系统查询
  • 企业网站建设文案案例个人网站做淘宝客教程
  • 如何设计网站步骤网站制作基础教程
  • 网站建设自查情况wordpress ip改域名
  • 百度智能小程序是什么wordpress极速优化
  • 淮安做网站的公司有哪些公司临沂百度seo
  • 西宁网站建设兼职如何使用手机看建设网站
  • 小偷程序做的网站能用吗建设一个购物网站的费用
  • 学习网站建设总结备案域名买卖
  • 有几家公司如何建设网站如何设计网站模板
  • 创建网站要钱吗专门做湘菜的网站
  • 百度爱采购网站官网荣茂网站建设
  • 沈阳手机网站制作汽车网站设计
  • 网站开发背景意义网站开发总结简写
  • 做网站金山设计很好看的网站
  • 做网站推广的联系方式装潢设计工作室
  • 用asp做网站遇到的问题大连开发区网站设计公司
  • 企业网站源码php外链网站是什么
  • 展示用网站模板北滘网站建设
  • 橙色企业网站模板logo设计软件在线制作
  • 做外贸一般要注册哪些外贸网站有没有做淘宝客网站的
  • 网站留言板的作用用html5做商城网站怎么做
  • php网站后台反应慢怎么解决长沙做网站团队