电子商务网站建设与维护代码,网推所什么意思,网络建设规划,风铃微网站怎么做前言
在之前我们讲过Spring和Dubbo的集成#xff0c;我们在服务上标注了DubboService的注解#xff0c;然后最终Dubbo会调用到ServiceBean#export方法中#xff0c;本次我们就来剖析下服务导出的全流程。
一、前置回顾 由于ServiceBean实现了ApplicationListener接口…前言
在之前我们讲过Spring和Dubbo的集成我们在服务上标注了DubboService的注解然后最终Dubbo会调用到ServiceBean#export方法中本次我们就来剖析下服务导出的全流程。
一、前置回顾 由于ServiceBean实现了ApplicationListener接口那么spring会在上下文刷新完成后回调其onApplicationEvent方法然后在这个方法内会调用export方法最终会调用到父类也就是ServiceConfig的export方法来执行服务暴露全流程。 二、服务导出源码剖析
2.1、ServiceConfig#export方法 这个方法主要是判断我们是不是需要延迟暴露可以通过Service(delay100)来使用延迟暴露功能最终都会调用到doExport方法 public synchronized void export() {if (provider ! null) {if (export null) {export provider.getExport();}if (delay null) {delay provider.getDelay();}}if (export ! null !export) {return;}//延迟暴露如果想使用延迟暴露功能可以在Service注解中添加delay属性 -》Service(delay 1000 )if (delay ! null delay 0) {delayExportExecutor.schedule(new Runnable() {Overridepublic void run() {doExport();}}, delay, TimeUnit.MILLISECONDS);} else {//go doExportdoExport();}}2.2、ServiceConfig#export方法 首先判断服务是否暴露过如果没有暴露则进行服务导出则把exported设置为true然后检查interfaceName是否为空为空抛出异常checkDefault检查provider是否为null为null就创建并把配置信息设置到provider中把provider中的applicationmoduleregistriesmonitorprotocols等属性赋值给ServiceConfig的相关属性如果赋值后registries和monitor则再尝试从module和application中获取赋值判断实现的接口类型是否是GenericService这个就是判断是否是泛化实现如果是的话则把interfaceClass设置为GenericService.class然后generic设置为true如果不是泛化实现则通过反射创建出来接口的class对象然后检查我们要导出的方法是否在接口中检查实现类是否是这个接口的实现类然后再把generic设置为false代表不是泛化导出。下面的checkApplicationcheckRegistrycheckProtocol方法就是做下参数检查判断我们刚刚设置的applicationprotocol…等不能为空。调用doExportUrls进行导出url给已经暴露的服务构造一个ProviderModel,然后缓存起来这里ProviderModel表示服务提供者模型里面存储这服务提供者相关信息ApplicationModel 缓存所有的 ProviderModel。 protected synchronized void doExport() {if (unexported) {throw new IllegalStateException(Already unexported!);}if (exported) {return;}exported true;if (interfaceName null || interfaceName.length() 0) {throw new IllegalStateException(dubbo:service interface\\ / interface not allow null!);}checkDefault();if (provider ! null) {if (application null) {application provider.getApplication();}if (module null) {module provider.getModule();}if (registries null) {registries provider.getRegistries();}if (monitor null) {monitor provider.getMonitor();}if (protocols null) {protocols provider.getProtocols();}}if (module ! null) {if (registries null) {registries module.getRegistries();}if (monitor null) {monitor module.getMonitor();}}if (application ! null) {if (registries null) {registries application.getRegistries();}if (monitor null) {monitor application.getMonitor();}}if (ref instanceof GenericService) {interfaceClass GenericService.class;if (StringUtils.isEmpty(generic)) {generic Boolean.TRUE.toString();}} else {try { //创建class对象interfaceClass Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}//检查方法是否在接口中checkInterfaceAndMethods(interfaceClass, methods);checkRef(); //检查实现类generic Boolean.FALSE.toString(); //不是泛化}if (local ! null) {if (true.equals(local)) {local interfaceName Local;}Class? localClass;try {localClass ClassHelper.forNameWithThreadContextClassLoader(local);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException(The local implementation class localClass.getName() not implement interface interfaceName);}}if (stub ! null) {if (true.equals(stub)) {stub interfaceName Stub;}Class? stubClass;try {stubClass ClassHelper.forNameWithThreadContextClassLoader(stub);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(stubClass)) {throw new IllegalStateException(The stub implementation class stubClass.getName() not implement interface interfaceName);}}checkApplication();checkRegistry();checkProtocol();appendProperties(this);checkStub(interfaceClass);checkMock(interfaceClass);if (path null || path.length() 0) {path interfaceName;}//go doExportUrlsdoExportUrls();CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());ProviderModel providerModel new ProviderModel(getUniqueServiceName(), this, ref);ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);}2.2.1、ServiceConfig#checkDefault 判断ServiceConfig的provider属性是否为空为空则new出来一个新的ProviderConfig然后调用appendProperties给provider对象填充其属性 private void checkDefault() {//判断provider如果为空则new一个出来if (provider null) {provider new ProviderConfig();}//填充属性到provider中appendProperties(provider);}2.2.2.1、ServiceConfig#appendProperties 首先根据传进来的配置类型拼接出来一个prefix这里如果是providerConfig那么前缀就是dubbo.provider.遍历providerConfig的所有方法然后获取方法名判断方法长度是大于3并且是set开头的说明是setxxx并且方法修饰符是public的且只有一个入参且参数类型是基本类型or包装类型然后满足上述条件继续后续流程首先截取出来需要注入的属性名例setName这里需要注入的属性名称就是name判断providerConfig是否有指定id我们可以在配置文件中指定id例dubbo.protocols.p1.iddubbo1,dubbo.protocols.p2.iddubbo2如果有指定id那么把id也拼接上拼接出来的配置名是dubbo.provider.dubbo1.name这里会根据这个配置名从JVM系统属性中查找有没有我们可以通过在服务启动时设置VM arguments -D dubbo.provider.dubbo1.name111指定如果拼接上id没有获取到配置那么把id去掉再获取一次这时候配置名是dubbo.provider.name在根据这个配置名从JVM属性中查找有没有如果上面都没有再查找类中有没有getName的方法如果有并且getName有返回结果则找到了属性值如果get方法返回的是空则再根据配置名dubbo.provider.dubbo1.name 从properties文件中获取如果没获取到再根据配置名dubbo.provider.name 从properties文件中获取如果也没获取到则再获取一下映射的key例dubbo.protocol.name“dubbo.service.protocol”然后再根据dubbo.service.protoco再从JVM系统属性中和properties文件中再获取一次如果获取到对应的属性值则调用其set方法把属性值设置进去 protected static void appendProperties(AbstractConfig config) {if (config null) {return;}//这里如果我们是providerConfig,prefix就是dubbo.provider.String prefix dubbo. getTagName(config.getClass()) .;Method[] methods config.getClass().getMethods();//遍历providerConfig的所有方法for (Method method : methods) {try {//获取方法名String name method.getName();//如果方法长度大于3并且是以set开头说明是setxxx并且方法是public的并且方法只有一个入参且方法//的参数类型是原始的也就是java提供的数据类型if (name.length() 3 name.startsWith(set) Modifier.isPublic(method.getModifiers()) method.getParameterTypes().length 1 isPrimitive(method.getParameterTypes()[0])) {//获取到需要注入的属性名例setName这里拿到的属性名就是nameString property StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() name.substring(4), .);String value null;//判断config有没有指定ID//例 dubbo.protocols.p1.iddubbo1,dubbo.protocols.p2.iddubbo2if (config.getId() ! null config.getId().length() 0) {//去JVM系统属性中查找有没有 -D dubbo.provider.dubbo1.name属性//这里我们可以通过在服务启动时设置VM argumentsString pn prefix config.getId() . property;value System.getProperty(pn);if (!StringUtils.isBlank(value)) {logger.info(Use System Property pn to config dubbo);}}//如果上面指定ID没有获取到配置那么把ID去掉再获取一次if (value null || value.length() 0) {//通过dubbo.provider.name获取String pn prefix property;value System.getProperty(pn);if (!StringUtils.isBlank(value)) {logger.info(Use System Property pn to config dubbo);}}//如果把ID去掉还没获取到if (value null || value.length() 0) {Method getter;//查找类中有没有 getName的方法try {getter config.getClass().getMethod(get name.substring(3));} catch (NoSuchMethodException e) {try {getter config.getClass().getMethod(is name.substring(3));} catch (NoSuchMethodException e2) {getter null;}}//如果有get方法if (getter ! null) {//get方法的返回结果是空if (getter.invoke(config) null) {//如果config中有指定ID则dubbo.provider.dubbo1.nameif (config.getId() ! null config.getId().length() 0) {value ConfigUtils.getProperty(prefix config.getId() . property);}//如果上面指定id没有获取到,则再次尝试从properties文件中获取if (value null || value.length() 0) {value ConfigUtils.getProperty(prefix property);}//如果从properties文件中没有获取到if (value null || value.length() 0) {//则尝试看有没有给这个服务指定配置String legacyKey legacyProperties.get(prefix property);//如果有则再从properties文件中获取这个服务指定的配置/* 尝试获取配置并转换dubbo.service.max.retry.providers 和dubbo.service.allow.no.provider 两个配置需要转换 */if (legacyKey ! null legacyKey.length() 0) {value convertLegacyValue(legacyKey, ConfigUtils.getProperty(legacyKey));}}}}}//如果到这里value不为空则通过反射调用其set方法把value设置进去if (value ! null value.length() 0) {method.invoke(config, convertPrimitive(method.getParameterTypes()[0], value));}}} catch (Exception e) {logger.error(e.getMessage(), e);}}}2.2.2.2、AbstractConfig#getTagName 首先获取到class的类名例ProviderConfig然后判断如果以指定后缀结尾的则去除掉SUFFIXES new String[]{“Config”, “Bean”}这里的ProviderConfig会变成Provider然后把Provider全转成小写的也就是provider返回 private static String getTagName(Class? cls) {String tag cls.getSimpleName();for (String suffix : SUFFIXES) {if (tag.endsWith(suffix)) {tag tag.substring(0, tag.length() - suffix.length());break;}}tag tag.toLowerCase();return tag;}2.2.2.3、AbstractConfig#isPrimitive 这个方法就是判断类型是不是基本数据类型或者是基本数据的包装类型或者是Object类型 private static boolean isPrimitive(Class? type) {return type.isPrimitive()|| type String.class|| type Character.class|| type Boolean.class|| type Byte.class|| type Short.class|| type Integer.class|| type Long.class|| type Float.class|| type Double.class|| type Object.class;}2.2.2.4、ConfigUtils#getProperty 这里假如我们传入的配置名是dubbo.provider.dubbo1.name那么会根据这个配置名先从JVM系统属性中获取获取到了直接返回从JVM系统属性中没有获取到再从properties文件中获取这里因为可能属性值存在${}表达式类型这里需要替换成对应的配置 public static String getProperty(String key, String defaultValue) {//先从JVM系统属性中获取String value System.getProperty(key);//获取到了直接返回if (value ! null value.length() 0) {return value;}//获取不到再从properties中获取Properties properties getProperties();//将$和${}表达式替换成对应的配置return replaceProperty(properties.getProperty(key, defaultValue), (Map) properties);}2.2.2.5、ConfigUtils#getProperties 首先看JVM系统属性中有没有通过 -D dubbo.properties.file xxx 指定dubbo的properties文件如果JVM系统属性中没有指定在从环境变量中查找dubbo.properties.file的属性值如果环境变量中也没有则默认使用dubbo.properties最后就是加载配置文件赋值到对应的属性中 public static Properties getProperties() {if (PROPERTIES null) {synchronized (ConfigUtils.class) {if (PROPERTIES null) {//先看系统属性中有没有通过 -D dubbo.properties.file xxx 指定dubbo的properties文件String path System.getProperty(Constants.DUBBO_PROPERTIES_KEY);//如果没有指定if (path null || path.length() 0) {//则再从环境变量中看有没有指定dubbo.properties.filepath System.getenv(Constants.DUBBO_PROPERTIES_KEY);//如果环境变量中也没有指定则就使用dubbo.propertiesif (path null || path.length() 0) {path Constants.DEFAULT_DUBBO_PROPERTIES;}}//这里就是读取properties文件赋值给属性PROPERTIES ConfigUtils.loadProperties(path, false, true);}}}return PROPERTIES;}2.2.2.6、ConfigUtils#replaceProperty 这里就是判断如果属性值包含表达式则取出来表达式中的内容根据取出来的内容再从系统属性中properties中获取最后替换表达式为对应的配置值返回 public static String replaceProperty(String expression, MapString, String params) {//表达式不含$则忽略if (expression null || expression.length() 0 || expression.indexOf($) 0) {return expression;}// 正则为\$\s*\{?\s*([\._0-9a-zA-Z])\s*\}?Matcher matcher VARIABLE_PATTERN.matcher(expression);StringBuffer sb new StringBuffer();while (matcher.find()) {String key matcher.group(1);//首先尝试从系统属性中获取String value System.getProperty(key);if (value null params ! null) {//没有再尝试从 properties 中获取value params.get(key);}if (value null) {value ;}//替换表达式为配置值matcher.appendReplacement(sb, Matcher.quoteReplacement(value));}matcher.appendTail(sb);return sb.toString();}2.2.2.7、legacyProperties legacyProperties中包含了很多映射的属性key例如我们根据dubbo.protocal.name去获取对应的属性值获取不到然后可以再根据dubbo.service.protocol再获取一次。 legacyProperties.put(dubbo.protocol.name, dubbo.service.protocol);legacyProperties.put(dubbo.protocol.host, dubbo.service.server.host);legacyProperties.put(dubbo.protocol.port, dubbo.service.server.port);legacyProperties.put(dubbo.protocol.threads, dubbo.service.max.thread.pool.size);legacyProperties.put(dubbo.consumer.timeout, dubbo.service.invoke.timeout);legacyProperties.put(dubbo.consumer.retries, dubbo.service.max.retry.providers);legacyProperties.put(dubbo.consumer.check, dubbo.service.allow.no.provider);legacyProperties.put(dubbo.service.url, dubbo.service.address);2.2.2、AbstractInterfaceConfig#checkInterfaceAndMethods 这个方法就是遍历接口类的所有方法然后判断暴露的方法名是不是此接口的方法不是则报错。 protected void checkInterfaceAndMethods(Class? interfaceClass, ListMethodConfig methods) {// 判断接口类是否为空if (interfaceClass null) {throw new IllegalStateException(interface not allow null!);}//判断接口类是不是一个接口if (!interfaceClass.isInterface()) {throw new IllegalStateException(The interface class interfaceClass is not a interface!);}//如果方法列表不为空if (methods ! null !methods.isEmpty()) {//遍历对应的方法for (MethodConfig methodBean : methods) {String methodName methodBean.getName();//方法名判空if (methodName null || methodName.length() 0) {throw new IllegalStateException(dubbo:method name attribute is required! Please check: dubbo:service interface\ interfaceClass.getName() \ ... dubbo:method name\\ ... //dubbo:reference);}boolean hasMethod false;//判断接口中是否包含此方法for (java.lang.reflect.Method method : interfaceClass.getMethods()) {if (method.getName().equals(methodName)) {hasMethod true;break;}}//不包含抛出异常if (!hasMethod) {throw new IllegalStateException(The interface interfaceClass.getName() not found method methodName);}}}}2.2.3、ServiceConfig#checkRef 判断ref引用的对象是不是接口的实现。 private void checkRef() {// reference should not be null, and is the implementation of the given interfaceif (ref null) {throw new IllegalStateException(ref not allow null!);}//判断ref是否是这个接口的实现if (!interfaceClass.isInstance(ref)) {throw new IllegalStateException(The class ref.getClass().getName() unimplemented interface interfaceClass !);}}2.2.4、AbstractInterfaceConfig#checkApplication 这里首先判断如果没有 application 配置则尝试从系统属性中获取 dubbo.application.name如果能获取到则创建一个ApplicationConfig然后调用appendProperties给ApplicationConfig赋值这个我们之前已经剖析过了判断有没有配置dubbo.service.shutdown.wait如果配置了设置到系统属性中。 注这里checkRegistrycheckProtocol都是一样的实现就不一一分析了 protected void checkApplication() {// 向后兼容if (application null) {//没有 application 配置则尝试从系统属性中获取String applicationName ConfigUtils.getProperty(dubbo.application.name);if (applicationName ! null applicationName.length() 0) {application new ApplicationConfig();}}//没有在配置文件中配置application并且系统属性中也没有则抛出异常if (application null) {throw new IllegalStateException(No such application config! Please add dubbo:application name\...\ / to your spring config.);}//给application赋值appendProperties(application);//如果配置了 dubbo.service.shutdown.wait则设置到系统属性中String wait ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);if (wait ! null wait.trim().length() 0) {System.setProperty(Constants.SHUTDOWN_WAIT_KEY, wait.trim());} else {wait ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);if (wait ! null wait.trim().length() 0) {System.setProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY, wait.trim());}}}2.2.5、ServiceConfig#checkStub 这里解析这个方法我们首先来看下什么是本地存根官方解释 Dubbo官网文档 远程服务后客户端通常只剩下接口而实现全在服务器端但提供方有些时候想在客户端也执行部分逻辑比如做 ThreadLocal 缓存提前验证参数调用失败后伪造容错数据等等此时就需要在 API 中带上 Stub客户端生成 Proxy 实例会把 Proxy 通过构造函数传给 Stub 然后把 Stub 暴露给用户Stub 可以决定要不要去调 Proxy。如下图 如果需要使用本地存根可以通过 其中 stub 参数可以 为true 此时存根类默认为为 {intefaceName} Stubstub 参数 也可以为 存根类路径名。此时存根类为stub 指向的类。 需要注意存根类有两个条件 存根类也需要实现暴露的服务接口存根类需要一个有参构造函数入参为服务接口的实现实例。 // stub 为 ture。 存根类为 {intefaceName} Stub即 com.foo.BarServiceStub
dubbo:service interfacecom.foo.BarService stubtrue /// 存根类为 stub 指定的类 com.foo.BarServiceStub
dubbo:service interfacecom.foo.BarService stubcom.foo.BarServiceStub /
package com.foo;
public class BarServiceStub implements BarService {private final BarService barService;// 构造函数传入真正的远程代理对象public BarServiceStub(BarService barService){this.barService barService;}public String sayHello(String name) {// 此代码在客户端执行, 你可以在客户端做ThreadLocal本地缓存或预先验证参数是否合法等等try {return barService.sayHello(name);} catch (Exception e) {// 你可以容错可以做任何AOP拦截事项return 容错数据;}}
}有了上面的案例这里我们看这个逻辑就简单了 获取存根类如果 stub 为 true 则使用 interfaceName stub 作为存根类否则反射获取存根类对stub检查判断是否存在入参为 interface 的构造函数 void checkStub(Class? interfaceClass) {//这里local已经过时了现在都用stub作为本地存根if (ConfigUtils.isNotEmpty(local)) {Class? localClass ConfigUtils.isDefault(local) ? ReflectUtils.forName(interfaceClass.getName() Local) : ReflectUtils.forName(local);if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException(The local implementation class localClass.getName() not implement interface interfaceClass.getName());}try {ReflectUtils.findConstructor(localClass, interfaceClass);} catch (NoSuchMethodException e) {throw new IllegalStateException(No such constructor \public localClass.getSimpleName() ( interfaceClass.getName() )\ in local implementation class localClass.getName());}}//如果stub不为空if (ConfigUtils.isNotEmpty(stub)) {//如果 stub 为 true 则使用 interfaceName stub 作为存根类否则反射获取存根类Class? localClass ConfigUtils.isDefault(stub) ? ReflectUtils.forName(interfaceClass.getName() Stub) : ReflectUtils.forName(stub);if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException(The local implementation class localClass.getName() not implement interface interfaceClass.getName());}try {//对stub检查是否存在入参为 interface 的构造函数ReflectUtils.findConstructor(localClass, interfaceClass);} catch (NoSuchMethodException e) {throw new IllegalStateException(No such constructor \public localClass.getSimpleName() ( interfaceClass.getName() )\ in local implementation class localClass.getName());}}}2.2.6、ServiceConfig#checkMock 这里首先我们要知道mock的作用 用以实现服务降级比如某验权服务当服务提供方全部挂掉后客户端不抛出异常而是通过 Mock 数据返回授权失败。 具体使用案例可以看Dubbo官方文档 首先检查有没有mock如果没有配置接返回获取mock的配置然后判断mock是不是return方式使用 return 来返回一个字符串表示的对象作为 Mock 的返回值这里就会检查返回值是否合法判断mock是不是throw方式使用 throw 来返回一个 Exception 对象作为 Mock 的返回值这里会检查是不是一个异常如果是在工程中提供mock实现则会判断本地是否存在mock实现这里就是判断本地是否存在xxxMock的class如果我们的接口是BarService那么会判断是否存在BarServiceMock的实例。 void checkMock(Class? interfaceClass) {//没有配置mock直接返回if (ConfigUtils.isEmpty(mock)) {return;}//获取mock的配置方式String normalizedMock MockInvoker.normalizeMock(mock);if (normalizedMock.startsWith(Constants.RETURN_PREFIX)) {normalizedMock normalizedMock.substring(Constants.RETURN_PREFIX.length()).trim();try {//检查mock的值是否合法如果不合法则抛出异常MockInvoker.parseMockValue(normalizedMock);} catch (Exception e) {throw new IllegalStateException(Illegal mock return in dubbo:service/reference ... mock\ mock \ /);}} else if (normalizedMock.startsWith(Constants.THROW_PREFIX)) {normalizedMock normalizedMock.substring(Constants.THROW_PREFIX.length()).trim();if (ConfigUtils.isNotEmpty(normalizedMock)) {try {MockInvoker.getThrowable(normalizedMock);} catch (Exception e) {throw new IllegalStateException(Illegal mock throw in dubbo:service/reference ... mock\ mock \ /);}}} else {//判断本地是否存在服务接口mock实现类MockInvoker.getMockObject(normalizedMock, interfaceClass);}}2.2.6、ServiceConfig#doExportUrls 首先获取注册中心列表,因为可能你在配置文件中指定了多个注册中心所以这里可能会返回多个遍历protocols调用doExportUrlsFor1Protocol依次暴露服务因为有可能我们指定了多种protocol如DubboREST这里会对每种协议都进行服务暴露。 private void doExportUrls() {//获取当前服务对应的服务注册中心实例ListURL registryURLs loadRegistries(true);for (ProtocolConfig protocolConfig : protocols) {//如果服务指定暴露多个协议(Dubbo,REST),则依次暴露服务doExportUrlsFor1Protocol(protocolConfig, registryURLs);}}2.2.6.1、AbstractInterfaceConfig#loadRegistries 检查配置中心不能为空并且给配置中心做属性注入遍历注册中心列表判断RegistryConfig的address是否为空如果为空就设置为0.0.0.0然后从JVM系统属性中获取dubbo.registry.address配置的值如果有那么是最高优的使用系统属性配置的如果有注册中心地址的话则把应用信息添加到参数map中然后在把注册中心配置信息添加到map中然后继续追加pathdubbo版本时间戳pid等信息追加到参数map中。解析address得到一个注册中心url列表遍历url列表然后把注册中心的类型添加到参数中,然后设置url的头部信息为registry然后判断如果是(服务提供者并且registertrue 或 null ) 或 (非服务提供者 subscribe true 或 null)则把当前url添加到注册中心列表 protected ListURL loadRegistries(boolean provider) {//检查配置中心类checkRegistry();ListURL registryList new ArrayListURL();if (registries ! null !registries.isEmpty()) {for (RegistryConfig config : registries) {String address config.getAddress();if (address null || address.length() 0) {//如果为空就设为0.0.0.0address Constants.ANYHOST_VALUE;}//从系统变量中获取注册中心地址String sysaddress System.getProperty(dubbo.registry.address);//这里如果在系统变量中指定了那么优先级最高if (sysaddress ! null sysaddress.length() 0) {address sysaddress;}//如果有注册中心地址的化if (address.length() 0 !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {MapString, String map new HashMapString, String();//把应用信息添加到map中appendParameters(map, application);//把注册中心配置信息添加到map中appendParameters(map, config);//把path/ dubbo版本/pid等信息添加到map中map.put(path, RegistryService.class.getName());map.put(dubbo, Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!map.containsKey(protocol)) {if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension(remote)) {map.put(protocol, remote);} else {map.put(protocol, dubbo);}}//解析得到URL列表address 可能包含多个注册中心 ip//因此解析得到的是一个 URL 列表ListURL urls UrlUtils.parseURLs(address, map);for (URL url : urls) {url url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());//设置头部信息 registryurl url.setProtocol(Constants.REGISTRY_PROTOCOL);// 通过判断条件决定是否添加 url 到 registryList 中条件如下// (服务提供者 register true 或 null)// || (非服务提供者 subscribe true 或 null)if ((provider url.getParameter(Constants.REGISTER_KEY, true))|| (!provider url.getParameter(Constants.SUBSCRIBE_KEY, true))) {registryList.add(url);}}}}}return registryList;}2.2.6.1.1、AbstractInterfaceConfig#appendParameters 遍历Config的所有方法判断方法是以get或者is开头的并且不是getClass并且方法的修饰符是public的并且方法不是无参的并且方法的返回类型是基本类型或者包装类或Object如果满足条件2则判断判断如果方法返回类型是Object 或者 方法上标注了Parameter注解但是Parameter中excluded为true则不处理获取属性名 例 getName or isPerson 就拿到了 name or person判断Parameter注解不为空并且在注解中指定了key如果指定了key就使用指定的没有就用属性名作为key通过反射执行方法拿到返回值判断如果返回值不为空则追加到参数中如果parameter注解不为空并且属性escaped设置为true则对方法返回值进行编码如果parameter注解不为空并且属性append设置为true则进行追加操作把当前值追加到map的键为default.key的vlaue后面用逗号拼接如果指定了前缀则给key拼接上前缀加入到参数列表中 12.如果条件2不满足则判断如果方法名是getParameters且方法修饰符是public且方法的返回类型是Map如果满足条件12则调用getParameters方法获取参数map判断如果参数map不为空则遍历所有的键值对给所有的key拼接上前缀 protected static void appendParameters(MapString, String parameters, Object config, String prefix) {if (config null) {return;}Method[] methods config.getClass().getMethods();for (Method method : methods) {try {String name method.getName();//判断方法是以get或者is开头的并且不是getClass并且方法的修饰符是public的//并且方法不是无参的并且方法的返回类型是基本类型或者包装类或Objectif ((name.startsWith(get) || name.startsWith(is)) !getClass.equals(name) Modifier.isPublic(method.getModifiers()) method.getParameterTypes().length 0 isPrimitive(method.getReturnType())) {Parameter parameter method.getAnnotation(Parameter.class);//判断如果方法返回类型是Object 或者 方法上标注了Parameter注解但是Parameter中excluded为trueif (method.getReturnType() Object.class || parameter ! null parameter.excluded()) {continue;}int i name.startsWith(get) ? 3 : 2;//获取属性名 例 getName or isPerson 就拿到了 name or personString prop StringUtils.camelToSplitName(name.substring(i, i 1).toLowerCase() name.substring(i 1), .);String key;//判断Parameter注解不为空并且在注解中指定了key如果指定了key就使用指定的没有就用属性名作为keyif (parameter ! null parameter.key().length() 0) {key parameter.key();} else {key prop;}//执行方法拿到返回值Object value method.invoke(config);String str String.valueOf(value).trim();//返回值不为空则追加到参数中if (value ! null str.length() 0) {//如果parameter注解不为空并且属性escaped设置为true则对返回值进行编码if (parameter ! null parameter.escaped()) {str URL.encode(str);}//如果parameter注解不为空并且属性append设置为true则进行追加操作//把当前值追加到map的键为default.key的vlaue后面用逗号拼接if (parameter ! null parameter.append()) {String pre parameters.get(Constants.DEFAULT_KEY . key);if (pre ! null pre.length() 0) {str pre , str;}pre parameters.get(key);if (pre ! null pre.length() 0) {str pre , str;}}//如果指定了前缀则给key拼接上前缀if (prefix ! null prefix.length() 0) {key prefix . key;}//把键值put到map中parameters.put(key, str);} else if (parameter ! null parameter.required()) {//如果方法上标注了parameter注解并且required属性为true则抛出异常因为方法没有返回值throw new IllegalStateException(config.getClass().getSimpleName() . key null);}//如果方法名是getParameters且方法修饰符是public且方法的返回类型是Map} else if (getParameters.equals(name) Modifier.isPublic(method.getModifiers()) method.getParameterTypes().length 0 method.getReturnType() Map.class) {//调用getParameters方法获取参数mapMapString, String map (MapString, String) method.invoke(config, new Object[0]);//如果参数map不为空则遍历所有的键值对给所有的key拼接上前缀if (map ! null map.size() 0) {String pre (prefix ! null prefix.length() 0 ? prefix . : );for (Map.EntryString, String entry : map.entrySet()) {parameters.put(pre entry.getKey().replace(-, .), entry.getValue());}}}} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}}2.2.6.1.2、UrlUtils#parseURLs 这里对地址做判断然后对地址按照分号或分号进行切割遍历切割后的自己调用parseURL方法解析成url。 public static ListURL parseURLs(String address, MapString, String defaults) {//地址判空if (address null || address.length() 0) {return null;}//按照逗号或分号切割地址String[] addresses Constants.REGISTRY_SPLIT_PATTERN.split(address);//如果切割后的地址为空则返回nullif (addresses null || addresses.length 0) {return null; //here wont be empty}ListURL registries new ArrayListURL();//遍历切割后的地址调用parseURLfor (String addr : addresses) {registries.add(parseURL(addr, defaults));}return registries;}2.2.6.1.3、UrlUtils#parseURL 根据地址类型判断是否需要切割例 // 第一种dubbo:registry address“zookeeper://10.20.153.10:2181?backup10.20.153.11:2181,10.20.153.12:2181”/ // 第二种dubbo:registry protocol“zookeeper” address“10.20.153.10:2181,10.20.153.11:2181,10.20.153.12:2181”/从参数中获取获得 “protocol” “username” “password” “host” “port” “path” 到 defaultXXX 属性中因为在 Dubbo URL 中这几个是独立的属性不在 Dubbo.parameters 属性中。把刚刚那几个属性从参数列表中移除因为是独立属性调用URL#valueOf创建Dubbo url判断生成的url是否有指定protocol,username,password等属性如果没有则从参数列表中获取判断地址有没有指定端口没有指定缺省为9090判断url有没有需要补充的也就是56做的判断如果有则重新构造URL返回。 public static URL parseURL(String address, MapString, String defaults) {if (address null || address.length() 0) {return null;}// 以 Zookeeper 注册中心配置集群的例子如下// java config : dubbo.registry.address zookeeper://lanboal:123456127.0.0.1:2181?loginfalse// 第一种dubbo:registry addresszookeeper://10.20.153.10:2181?backup10.20.153.11:2181,10.20.153.12:2181/// 第二种dubbo:registry protocolzookeeper address10.20.153.10:2181,10.20.153.11:2181,10.20.153.12:2181/String url;if (address.indexOf(://) 0) { //第一行情况url address;} else {//第二种情况需要按照切割String[] addresses Constants.COMMA_SPLIT_PATTERN.split(address);url addresses[0];if (addresses.length 1) {StringBuilder backup new StringBuilder();for (int i 1; i addresses.length; i) {if (i 1) {backup.append(,);}backup.append(addresses[i]);}url ? Constants.BACKUP_KEY backup.toString();}}// 从 defaults 中获得 protocol username password host port path 到 defaultXXX 属性种。// 因为在 Dubbo URL 中这几个是独立的属性不在 Dubbo.parameters 属性中。String defaultProtocol defaults null ? null : defaults.get(protocol);if (defaultProtocol null || defaultProtocol.length() 0) {defaultProtocol dubbo;}String defaultUsername defaults null ? null : defaults.get(username);String defaultPassword defaults null ? null : defaults.get(password);int defaultPort StringUtils.parseInteger(defaults null ? null : defaults.get(port));String defaultPath defaults null ? null : defaults.get(path);MapString, String defaultParameters defaults null ? null : new HashMapString, String(defaults);// 需要移除因为这几个是独立属性。if (defaultParameters ! null) {defaultParameters.remove(protocol);defaultParameters.remove(username);defaultParameters.remove(password);defaultParameters.remove(host);defaultParameters.remove(port);defaultParameters.remove(path);}//创建 Dubbo URLURL u URL.valueOf(url);// 若 u 的属性存在非空的情况下从 defaultXXX 属性赋值到 u 的属性中。boolean changed false; // 是否改变即从 defaultXXX 属性赋值到 u 的属性中。String protocol u.getProtocol();String username u.getUsername();String password u.getPassword();String host u.getHost();int port u.getPort();String path u.getPath();MapString, String parameters new HashMapString, String(u.getParameters());// 如果原始url上未指定protocol,username,password, 但有default值,则追加到url上if ((protocol null || protocol.length() 0) defaultProtocol ! null defaultProtocol.length() 0) {changed true;protocol defaultProtocol;}if ((username null || username.length() 0) defaultUsername ! null defaultUsername.length() 0) {changed true;username defaultUsername;}if ((password null || password.length() 0) defaultPassword ! null defaultPassword.length() 0) {changed true;password defaultPassword;}/*if (u.isAnyHost() || u.isLocalHost()) {changed true;host NetUtils.getLocalHost();}*/if (port 0) {if (defaultPort 0) {changed true;port defaultPort;} else {changed true;port 9090; // 如果地址没有端口缺省为9090}}if (path null || path.length() 0) {if (defaultPath ! null defaultPath.length() 0) {changed true;path defaultPath;}}if (defaultParameters ! null defaultParameters.size() 0) {for (Map.EntryString, String entry : defaultParameters.entrySet()) {String key entry.getKey();String defaultValue entry.getValue();if (defaultValue ! null defaultValue.length() 0) {String value parameters.get(key);if (value null || value.length() 0) {changed true;parameters.put(key, defaultValue);}}}}//若改变创建新的Dubbo URLif (changed) {u new URL(protocol, username, password, host, port, path, parameters);}return u;}2.2.6.2、ServiceConfig#doExportUrlsFor1Protocol 判断协议如果没有指定则默认使用dubbo协议构造参数map把一些信息put到map中如sideproviderdubbo2.6.0时间戳pid等还有一些配置信息入applicationConfig,providerConfig,这里appendParameters我们前面已经分析过了遍历处理MethodConfig把方法信息和参数信息添加到参数map中判断是不是泛化调用如果是就把泛化调用的信息添加到map中设置methods*如果不是泛化调用就找到所有的method然后将methods所有的method名拼接起来将token参数设置到map中如果协议是injvm则设置notify为false protocolConfig.setRegister(false);获取hostport最终根据map中的参数主机和端口构造出来一个新的URL从url中获取配置的scope如果这个scope不是none则继续判断scope是不是不是remote如果不是则进行本地暴露判断scope是不是不是local的不是则会进行远程暴露然后遍历注册中心列表首先获取监控中心如果监控中心url不为空则把监控中心添加到url的参数中然后将proxy_key属性添加到registryURL中其实这个proxy_key 的值是可以设置的就是告诉dubbo我用什么来进行生成代理使用proxyFactory生成一个invoker对象然后再构造一个DelegateProviderMetaDataInvoker把原始的配置信息和invoker绑定到一起调用protocol属性的export方法进行服务暴露把返回的Exporter对象添加到导出列表中 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, ListURL registryURLs) {String name protocolConfig.getName();if (name null || name.length() 0) {name dubbo; //默认协议是dubbo}MapString, String map new HashMapString, String();map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); //side 哪一端map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());//版本map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); //timestampif (ConfigUtils.getPid() 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); //pid}//读取其他配置信息到map用于后续构造URLappendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);if (methods ! null !methods.isEmpty()) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey method.getName() .retry;if (map.containsKey(retryKey)) {String retryValue map.remove(retryKey);if (false.equals(retryValue)) {map.put(method.getName() .retries, 0);}}ListArgumentConfig arguments method.getArguments();if (arguments ! null !arguments.isEmpty()) {for (ArgumentConfig argument : arguments) {// convert argument typeif (argument.getType() ! null argument.getType().length() 0) {Method[] methods interfaceClass.getMethods();// visit all methodsif (methods ! null methods.length 0) {for (int i 0; i methods.length; i) {String methodName methods[i].getName();// target the method, and get its signatureif (methodName.equals(method.getName())) {Class?[] argtypes methods[i].getParameterTypes();// one callback in the methodif (argument.getIndex() ! -1) {if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {appendParameters(map, argument,method.getName() . argument.getIndex());} else {throw new IllegalArgumentException(argument config error : the index attribute and type attribute not match :index : argument.getIndex() , type: argument.getType());}} else {// multiple callbacks in the methodfor (int j 0; j argtypes.length; j) {Class? argclazz argtypes[j];if (argclazz.getName().equals(argument.getType())) {appendParameters(map, argument, method.getName() . j);if (argument.getIndex() ! -1 argument.getIndex() ! j) {throw new IllegalArgumentException(argument config error : the index attribute and type attribute not match :index : argument.getIndex() , type: argument.getType());}}}}}}}} else if (argument.getIndex() ! -1) {appendParameters(map, argument, method.getName() . argument.getIndex());} else {throw new IllegalArgumentException(argument config must set index or type attribute.eg: dubbo:argument index0 .../ or dubbo:argument typexxx .../);}}}} // end of methods for}//泛化调用if (ProtocolUtils.isGeneric(generic)) {map.put(Constants.GENERIC_KEY, generic);map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {String revision Version.getVersion(interfaceClass, version);if (revision ! null revision.length() 0) {map.put(revision, revision);}String[] methods Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length 0) {logger.warn(NO method found in service interface interfaceClass.getName());map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {map.put(Constants.METHODS_KEY, StringUtils.join(new HashSetString(Arrays.asList(methods)), ,));}}if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(Constants.TOKEN_KEY, token);}}//本地injvmif (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {protocolConfig.setRegister(false);map.put(notify, false);}// export serviceString contextPath protocolConfig.getContextpath();if ((contextPath null || contextPath.length() 0) provider ! null) {contextPath provider.getContextpath();}//hostString host this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port this.findConfigedPorts(protocolConfig, name, map);URL url new URL(name, host, port,(contextPath null || contextPath.length() 0 ? : contextPath /) path, map);if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}String scope url.getParameter(Constants.SCOPE_KEY);// dont export when none is configuredif (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// export to local if the config is not remote (export to remote only when config is remote)//本地服务暴露 不是remote就本地暴露如果不配置scope也进行本地暴露if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);}// export to remote if the config is not local (export to local only when config is local)//远程服务暴露 不是localif (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {logger.info(Export dubbo service interfaceClass.getName() to url url);}if (registryURLs ! null !registryURLs.isEmpty()) {//遍历注册中心for (URL registryURL : registryURLs) {url url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));URL monitorUrl loadMonitor(registryURL);//获取监控中心if (monitorUrl ! null) { //将监控中心添加到url中url url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info(Register dubbo service interfaceClass.getName() url url to registry registryURL);}// For providers, this is used to enable custom proxy to generate invokerString proxy url.getParameter(Constants.PROXY_KEY);//配置中有proxy_key 的话就使用配置的if (StringUtils.isNotEmpty(proxy)) { //设置配置的 proxy_keyregistryURL registryURL.addParameter(Constants.PROXY_KEY, proxy);}//invoker 使用ProxyFactory 生成 invoker对象这里这个invoker其实是一个代理对象Invoker? invoker proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));//创建 DelegateProvoderMetaInvoker对象,就是包装了一下这个invoker主要是把原始的配置信息跟invoker绑定在一起。DelegateProviderMetaDataInvoker wrapperInvoker new DelegateProviderMetaDataInvoker(invoker, this);// filter ----listener ---registryProtocol ( 这里使用了wapper 包装机制)// filter ---- listener ---- dubboProtocol 服务暴露//go export 我们看下这个protocol的默认实现是 SPI(dubbo)Exporter? exporter protocol.export(wrapperInvoker);//添加exporterexporters.add(exporter);}} else {//没有注册中心Invoker? invoker proxyFactory.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker new DelegateProviderMetaDataInvoker(invoker, this);Exporter? exporter protocol.export(wrapperInvoker);exporters.add(exporter);}}}this.urls.add(url);}2.2.6.3、ServiceConfig#exportLocal 判断protocol是不是injvm不是则把URL中的protocol改成injvm然后host改成127.0.0.1port改成1这里就是新生成一个URL把之前url的配置拷贝过来了往congtext中添加一个键值key是接口的全类名value是实现类的全类名然后调用proxyFactory#getInvoker生成一个Invoker对象然后调用protocol#export导出这个对象 private void exportLocal(URL url) {//本地服务暴露//如果protocol不是injvmif (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {//设置protocol是injvmURL local URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST) //host 是127.0.0.1.setPort(0);//这里往congtext中添加一个键值key是接口的全类名value是实现类的全类名StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));//ref接口实现类 interfaceClass接口class localURL//因为我们这里URL中的protocol是injvm所以回去找对应的实现也就是InjvmProtocol这个类//这个在参数中会调用 proxyFactory.getInvoker获取到一个invokerExporter? exporter protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info(Export dubbo service interfaceClass.getName() to local registry);}}2.2.6.3.1、proxyFactory#getInvoker 这里proxyFactory是我们的一个类属性他是通过SPI查找其实现之前我们已经分析过Dubbo的SPI这里getAdaptiveExtension方法回去获取自适应的实现。 private static final ProxyFactory proxyFactory ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();这里我们看getProxy方法会首先根据Adaptive注解从我们的URL中去查找proxy对应的值因为我们没有指定就会走到类上面标注的SPI(“javassist”)默认的实现javassist实现类。 SPI(javassist)
public interface ProxyFactory {/*** create proxy.** param invoker* return proxy*/Adaptive({Constants.PROXY_KEY})T T getProxy(InvokerT invoker) throws RpcException;/*** create proxy.** param invoker* return proxy*///是否是泛化调用Adaptive({Constants.PROXY_KEY})T T getProxy(InvokerT invoker, boolean generic) throws RpcException;/*** create invoker.** param T* param proxy* param type* param url* return invoker*///咱们用的getInvoker方法然后会根据proxy这个属性去咱们的URL中找对应的值我们现在没有刻意设置这个proxy属性的话就会走默认// 也就是SPI(“javassist”)中的javassist实现类。这块知识数据dubbo spi里面的。Adaptive({Constants.PROXY_KEY})T InvokerT getInvoker(T proxy, ClassT type, URL url) throws RpcException;}所以上面我们调用proxy#getInvoker方法其实调用到了JavassistProxyFactory#getInvoker方法。 这里首先会调用Wrapper#getWrapper帮我们生成一个Wrapper这个Wrapper会根据你提供的这个类型生成一个获取你这个类中成员变量的方法设置成员变量的方法执行你这个类中方法的方法。new一个AbstractProxyInvoker返回并重写其doInvoke方法在doInvoke方法汇总调用wrapper#invokeMethod方法 public T InvokerT getInvoker(T proxy, ClassT type, URL url) {// TODO Wrapper cannot handle this scenario correctly: the classname contains $//这里是如果接口实现类中有$符号就是用接口类型没有$符号就用实现类的类型final Wrapper wrapper Wrapper.getWrapper(proxy.getClass().getName().indexOf($) 0 ? proxy.getClass() : type);return new AbstractProxyInvokerT(proxy, type, url) {//进行调用Overrideprotected Object doInvoke(T proxy, String methodName,Class?[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};} 下面这个类是我们通过arthas查看到帮我们生成的代理对象的结构如下 package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.NoSuchMethodException;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.demo.provider.DemoServiceImpl;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;public class Wrapper1 extends Wrapper implements ClassGenerator.DC {public static String[] pns; //字段名public static Map pts; //字段名字段类型public static String[] mns; //方法名public static String[] dmns; //自己方法的名字public static Class[] mts0; //方法参数类型public String[] getPropertyNames() {return pns;}public boolean hasProperty(String string) {return pts.containsKey(string);}public Class getPropertyType(String string) {return (Class)pts.get(string);}public String[] getMethodNames() {return mns;}public String[] getDeclaredMethodNames() {return dmns;}public void setPropertyValue(Object object, String string, Object object2) {try {DemoServiceImpl demoServiceImpl (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}throw new NoSuchPropertyException(new StringBuffer().append(Not found property \).append(string).append(\ filed or setter method in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.).toString());}public Object getPropertyValue(Object object, String string) {try {DemoServiceImpl demoServiceImpl (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}throw new NoSuchPropertyException(new StringBuffer().append(Not found property \).append(string).append(\ filed or setter method in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.).toString());}// 这个就是Wrapper对象的invokerMethod方法public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {DemoServiceImpl demoServiceImpl;try {demoServiceImpl (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}try {if (sayHello.equals(string) arrclass.length 1) {// 调用DemoServiceImpl实例对象的sayHello方法,并将结果返回return demoServiceImpl.sayHello((String)arrobject[0]);}}catch (Throwable throwable) {throw new InvocationTargetException(throwable);}throw new NoSuchMethodException(new StringBuffer().append(Not found method \).append(string).append(\ in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.).toString());}
}这里我们返回的AbstractProxyInvoker也非常简单构造参数中会对 proxy 接口实现类具体提供服务, Class type 接口类型, URL url 这三个参数进行检验然后存储。实现接口Invoker的invoke(Invocation invocation)方法实际上还是子类实现doInvoke方法这里会调用到wrapper.invokeMethod方法 public abstract class AbstractProxyInvokerT implements InvokerT {private final T proxy;private final ClassT type;private final URL url;public AbstractProxyInvoker(T proxy, ClassT type, URL url) {//参数校验if (proxy null) {throw new IllegalArgumentException(proxy null);}if (type null) {throw new IllegalArgumentException(interface null);}//proxy 需要是实现typeif (!type.isInstance(proxy)) {throw new IllegalArgumentException(proxy.getClass().getName() not implement interface type);}this.proxy proxy;this.type type;this.url url;}Overridepublic ClassT getInterface() {return type;}Overridepublic URL getUrl() {return url;}Overridepublic boolean isAvailable() {return true;}Overridepublic void destroy() {}Overridepublic Result invoke(Invocation invocation) throws RpcException {try {return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));} catch (InvocationTargetException e) {return new RpcResult(e.getTargetException());} catch (Throwable e) {throw new RpcException(Failed to invoke remote proxy method invocation.getMethodName() to getUrl() , cause: e.getMessage(), e);}}//实际调用子类的实现protected abstract Object doInvoke(T proxy, String methodName, Class?[] parameterTypes, Object[] arguments) throws Throwable;Overridepublic String toString() {return getInterface() - (getUrl() null ? : getUrl().toString());}}
2.2.6.3.2、protocol#export 这里protocol是我们的一个类属性他是通过SPI查找其实现之前我们已经分析过Dubbo的SPI这里getAdaptiveExtension方法回去获取自适应的实现。 private static final Protocol protocol ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 这里我们看export方法会首先根据Adaptive注解从我们的URL中去查找protocol对应的值因为我们在url中指定protocolinjvm,所以会找到InjvmProtocol实现 SPI(dubbo)
public interface Protocol {int getDefaultPort();//我们可以看到Protocol这个扩展点export方法上是有Adaptive注解的然后没有value根据自适应的规则没有value则value类名小写。//这样的话就是从url中获取protocol的值咱们invoker里面包的url是registryURL然后对应的protocol也就是注册中心的protocol// RegistryProtocol这个类。但是在创建扩展实现类的时候dubbo会给我们setter注入与wrapper包装所以我们拿到的RegistryProtocol最终样子是这样的AdaptiveT ExporterT export(InvokerT invoker) throws RpcException;AdaptiveT InvokerT refer(ClassT type, URL url) throws RpcException;void destroy();}这里我们看InjvmProtocol的export方法会直接new一个InjvmExporter返回 public T ExporterT export(InvokerT invoker) throws RpcException {//这里直接 new InjvmExporter类返回return new InjvmExporterT(invoker, invoker.getUrl().getServiceKey(), exporterMap);} 最终是将key接口全类名valuethis也就是InjvmExporter对象put到exporterMap中了。 class InjvmExporterT extends AbstractExporterT {private final String key;private final MapString, Exporter? exporterMap;//最终是将key接口全类名valuethis也就是InjvmExporter对象put到exporterMap中了。//再回到ServiceConfig的exportLocal方法中。还有最后一句exporters.add(exporter);//这里是将上面生成的InjvmExporter对象缓存了起来。//到这里我们这个服务本地暴露(injvm)就解析完成了。InjvmExporter(InvokerT invoker, String key, MapString, Exporter? exporterMap) {super(invoker);this.key key;this.exporterMap exporterMap;exporterMap.put(key, this);}Overridepublic void unexport() {super.unexport();exporterMap.remove(key);}}2.2.6.4、protocol#export 这里protocol是我们的一个类属性他是通过SPI查找其实现之前我们已经分析过Dubbo的SPI这里getAdaptiveExtension方法回去获取自适应的实现。 private static final Protocol protocol ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 这里由于我们在url中的protocol属性是registry,所以这里的实现是RegistryProtocol,但是Dubbo的SPI在创建扩展实现类的时候会给我进行wrapper包装所以这里拿到的RegistryProtocol是经过几次包装的 Listener - Filter - Qos - RegistryProtocol 2.2.6.4.1、ProtocolListenerWrapper#export 由于我们这里是registry协议所以会直接会调用下一个的export方法 public T ExporterT export(InvokerT invoker) throws RpcException {//这里也是判断了一下是否是registry协议如果是的话就直接进入下一个RegistryProtocol。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}//这里将服务暴露返回的exporter与自动激活的listener们绑在了一起。return new ListenerExporterWrapperT(protocol.export(invoker),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}2.2.6.4.2、ProtocolFilterWrapper#export 由于我们这里是registry协议所以会直接会调用下一个的export方法 public T ExporterT export(InvokerT invoker) throws RpcException {//首先判断这个Protocol是不是registry的如果是的话就直接跳过的了也就是没它什么事了。//如果不是就要走下面这个这里调用了 buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)方法// 其中 Constants.SERVICE_FILTER_KEY是service.filterConstants.PROVIDER 是provider我们来看下这个方法。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}2.2.6.4.3、QosProtocolWrapper#export 判断是registry协议的话就启动Qos服务器,然后继续调用下一个的export方法也就是RegistryProtocol#export public T ExporterT export(InvokerT invoker) throws RpcException {//判断是registryif (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {//启动Qos服务器startQosServer(invoker.getUrl());return protocol.export(invoker);}return protocol.export(invoker);}2.2.6.4.4、RegistryProtocol#export 调用doLocalExport进行本地导出从originInvoker获取注册中心对象然后获取服务提供者的url想本地服务注册表注册服务是将刚刚塞到本地注册表中的invoker有个reg的属性设置成true表示注册了向注册中心注册当前服务订阅当前服务当服务实例变更时可以更新本地缓存构造DestroyableExporter返回。 public T ExporterT export(final InvokerT originInvoker) throws RpcException {//export invoker//暴露服务 doLocalExport表示本地启动服务不包括去注册中心注册final ExporterChangeableWrapperT exporter doLocalExport(originInvoker);//获取注册中心URLURL registryUrl getRegistryUrl(originInvoker);//registry provider//获得注册中心对象final Registry registry getRegistry(originInvoker);//获取服务提供者的urlfinal URL registeredProviderUrl getRegisteredProviderUrl(originInvoker);//to judge to delay publish whether or notboolean register registeredProviderUrl.getParameter(register, true);//向本地服务注册表注册服务ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);if (register) { //向注册中心注册自己//go registerregister(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.final URL overrideSubscribeUrl getSubscribedOverrideUrl(registeredProviderUrl);final OverrideListener overrideSubscribeListener new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);//Ensure that a new exporter instance is returned every time exportreturn new DestroyableExporterT(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);}2.2.6.4.5、RegistryProtocol#doLocalExport bounds为export的service缓存key为通过Invoker获取的providerUrlvalue为exporter的包装类ExporterChangeableWrapper。主要解决RMI重复暴露端口冲突的问题已经暴露的服务不再暴露。构造InvokerDelegete这里调用getProviderUrl获取url其实就是从origininvoker的url中找参数export对应的value对应的value为dubbo所以我们这里的导出协议是dubbo调用protocol.export方法这里protocol的实例是DubboProtocol的包装类共包装了ProtocolListenerWrapperProtocolFilterWrapperQosProtocolWrapper 三层 private T ExporterChangeableWrapperT doLocalExport(final InvokerT originInvoker) {String key getCacheKey(originInvoker);ExporterChangeableWrapperT exporter (ExporterChangeableWrapperT) bounds.get(key);if (exporter null) { //之前没有暴露过synchronized (bounds) {exporter (ExporterChangeableWrapperT) bounds.get(key);if (exporter null) {//封装InvokerDelegete 将url封装起来了final Invoker? invokerDelegete new InvokerDelegeteT(originInvoker, getProviderUrl(originInvoker));exporter new ExporterChangeableWrapperT((ExporterT) protocol.export(invokerDelegete), originInvoker);//缓存起来bounds.put(key, exporter);}}}return exporter;}2.2.6.4.5.1、ProtocolListenerWrapper#export 由于这里我们的协议是dubbo所以这里会首先调用下一个protocol#export方法然后把返回的exporter和自动激活的listener们绑在了一起。 public T ExporterT export(InvokerT invoker) throws RpcException {//这里也是判断了一下是否是registry协议如果是的话就直接进入下一个RegistryProtocol。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}//这里将服务暴露返回的exporter与自动激活的listener们绑在了一起。return new ListenerExporterWrapperT(protocol.export(invoker),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}这里在ListenerExporterWrapper构造方法中会遍历通知listener告诉他们当前服务暴露完了。 public ListenerExporterWrapper(ExporterT exporter, ListExporterListener listeners) {//判断nullif (exporter null) {throw new IllegalArgumentException(exporter null);}this.exporter exporter;this.listeners listeners;if (listeners ! null !listeners.isEmpty()) {RuntimeException exception null;//遍历通知for (ExporterListener listener : listeners) {if (listener ! null) {try {//通知listener.exported(this);} catch (RuntimeException t) {logger.error(t.getMessage(), t);exception t;}}}if (exception ! null) {throw exception;}}}2.2.6.4.5.2、ProtocolFilterWrapper#export 这里首先调用buildInvokerChain生成一个过滤调用链最后到真实的invoker继续调用下一个protocol#export方法 public T ExporterT export(InvokerT invoker) throws RpcException {//首先判断这个Protocol是不是registry的如果是的话就直接跳过的了也就是没它什么事了。//如果不是就要走下面这个这里调用了 buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)方法// 其中 Constants.SERVICE_FILTER_KEY是service.filterConstants.PROVIDER 是provider我们来看下这个方法。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}这里就是根据spi获取所有的Filter然后用Filter包装起来invoker调用这个invoker就会先过所有的过滤器这个后续我们再逐个分析过滤器的实现 private static T InvokerT buildInvokerChain(final InvokerT invoker, String key, String group) {InvokerT last invoker;//这个方法执行了ListFilter filters ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); // 这里我们本文的主角就登场了获取了Filter的ExtensionLoader对象然后调用getActivateExtension方法参数分别是urlservice.filterprovider接着我们要看下这个方法了ListFilter filters ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (!filters.isEmpty()) {for (int i filters.size() - 1; i 0; i--) {final Filter filter filters.get(i);final InvokerT next last;last new InvokerT() {Overridepublic ClassT getInterface() {return invoker.getInterface();}Overridepublic URL getUrl() {return invoker.getUrl();}Overridepublic boolean isAvailable() {return invoker.isAvailable();}//exception-moniter-timeout-trace-context-generic-classloader-echoOverridepublic Result invoke(Invocation invocation) throws RpcException {return filter.invoke(next, invocation);}Overridepublic void destroy() {invoker.destroy();}Overridepublic String toString() {return invoker.toString();}};}}return last;}2.2.6.4.5.3、QosProtocolWrapper#export 这里我们是dubbo协议所以直接会调用下一个实现也就是DubboProtocol#export方法 public T ExporterT export(InvokerT invoker) throws RpcException {//判断是registryif (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {//启动Qos服务器startQosServer(invoker.getUrl());return protocol.export(invoker);}return protocol.export(invoker);}
2.2.6.4.5.4、DubboProtocol#export 首先根据url生成一个服务key然后创建DubboExporter放到exporterMap中调用openServer启动服务器 public T ExporterT export(InvokerT invoker) throws RpcException {URL url invoker.getUrl();// export service. key com.alibaba.dubbo.demo.DemoService:20880String key serviceKey(url);DubboExporterT exporter new DubboExporterT(invoker, key, exporterMap);exporterMap.put(key, exporter);//export an stub service for dispatching eventBoolean isStubSupportEvent url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent !isCallbackservice) {String stubServiceMethods url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods null || stubServiceMethods.length() 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException(consumer [ url.getParameter(Constants.INTERFACE_KEY) ], has set stubproxy support event ,but no stub methods founded.));}} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}//go openServeropenServer(url);optimizeSerialization(url);return exporter;}2.2.6.4.5.5、DubboProtocol#openServer 首先获取地址然后判断是不是服务端如果是服务端并且之前没有暴露过则调用createServer创建服务器 private void openServer(URL url) {// find server. 获得地址 ipport 192.168.1.104:22880String key url.getAddress();//client can export a service whichs only for server to invoke//客户端可以暴露仅供服务器调用的服务boolean isServer url.getParameter(Constants.IS_SERVER_KEY, true);if (isServer) { //判断是否是服务器//查找缓存的服务器ExchangeServer server serverMap.get(key);if (server null) { // 没有找到service 就要创建serverserverMap.put(key, createServer(url));} else {// server supports reset, use together with override//支持重置与覆盖一起使用server.reset(url);}}}2.2.6.4.5.6、DubboProtocol#createServer 首先设置一些参数如下。CHANNEL_READONLYEVENT_SENT_KEY为当server关闭时发布readonly事件HEARTBEAT_KEY为心跳时间CODEC_KEY为此url的通过DubboCodec进行的编码然后再获取url中我们配置的server类型默认是netty再通过Exchangers的静态方法创建对应的ExchangeServer private ExchangeServer createServer(URL url) {// send readonly event when server closes, its enabled by default//服务器关闭时发送只读事件默认情况下启用url url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());// enable heartbeat by default//60*1000 设置心跳url url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));//获取配置的服务器类型缺省就是使用netty默认服务器是nettyString str url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);//不存在Transports 话就抛出 不支持的server typeif (str ! null str.length() 0 !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException(Unsupported server type: str , url: url);//设置 Codec 的类型为dubbourl url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);ExchangeServer server;try {//go bind methodserver Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException(Fail to start server(url: url ) e.getMessage(), e);}str url.getParameter(Constants.CLIENT_KEY);if (str ! null str.length() 0) {SetString supportedTypes ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException(Unsupported client type: str);}}return server;}2.2.6.4.5.7、Exchangers#bind 进行参数校验然后从url中获取exchanger缺省是header,然后这里获得的就是HeaderExchanger然后调用HeaderExchanger#bind方法 public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {//验证if (url null) {throw new IllegalArgumentException(url null);}if (handler null) {throw new IllegalArgumentException(handler null);}url url.addParameterIfAbsent(Constants.CODEC_KEY, exchange);//go HeaderExchanger - bindreturn getExchanger(url).bind(url, handler);}public static Exchanger getExchanger(URL url) {//这里可以看到从url中获取exchanger 缺省是header然后使用dubbo spi 获取到HeaderExchanger我们看下HeaderExchanger源码String type url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);return getExchanger(type);}public static Exchanger getExchanger(String type) {return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}2.2.6.4.5.8、HeaderExchanger#bind 这里首先把handler包装成HeaderExchangeHandler再把HeaderExchangeHandler包装成DecodeHandler。调用Transporters#bind进行bind创建HeaderExchangeServer对server进行增强HeaderExchangeServer主要增加了心跳处理 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 创建一个通信server DecodeHandler HeaderExchangeHandler handler//这个Transporters也是门面类对外统一了bind 与connect。//HeaderExchangeServer维护心跳return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}2.2.6.4.5.9、Transporters#bind 首先进行参数校验接着判断handler的个数如果是多个则再把handler包装成ChannelHandlerDispatcher里面主要是对多个handler进行分发调用getTransporter获取Transporter自适应的扩展点缺省是netty public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {//验证if (url null) {throw new IllegalArgumentException(url null);}if (handlers null || handlers.length 0) {throw new IllegalArgumentException(handlers null);}ChannelHandler handler;//创建handlerif (handlers.length 1) {handler handlers[0];} else { //多个channel 对channel进行分发 ChannelHandlerDispatcherhandler new ChannelHandlerDispatcher(handlers);}//真正服务器 进行bindreturn getTransporter().bind(url, handler);}
public static Transporter getTransporter() {return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}2.2.6.4.10、NettyTransporter#bind 这里直接构造了一个NettyServer对象返回 public Server bind(URL url, ChannelHandler listener) throws RemotingException {//在这里直接new了一个NettyServer对象return new NettyServer(url, listener);}2.2.6.4.11、NettyServer#constructor 调用父类的构造方法这里又对handler包了多层这里层次如下 public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);}protected static ChannelHandlers getInstance() {return INSTANCE;}static void setTestingChannelHandlers(ChannelHandlers instance) {INSTANCE instance;}protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {//调用多消息处理器对心跳消息进行了功能加强return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}
}
2.2.6.4.12、AbstractServer#constructor 先从url中获取一些配置信息如下调用doOpen开启服务,会调用到子类NettyServer#doOpen public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);//从url中获得本机地址localAddress getUrl().toInetSocketAddress();//从url中获得绑定的ipString bindIp getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());//从url中获得绑定的端口号int bindPort getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());//判断url中配置anyhost是否为true或者判断host是否为不可用的本机hostif (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp NetUtils.ANYHOST;}bindAddress new InetSocketAddress(bindIp, bindPort);//服务器最大可接受连接数this.accepts url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);this.idleTimeout url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);try {//go doOpendoOpen(); //子类实现真正打开服务器if (logger.isInfoEnabled()) {logger.info(Start getClass().getSimpleName() bind getBindAddress() , export getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, Failed to bind getClass().getSimpleName() on getLocalAddress() , cause: t.getMessage(), t);}//fixme replace this with better method//获取线程池DataStore dataStore ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}2.2.6.4.13、AbstractServer#doOpen 这里就是标准的netty启动流程了boss线程是1一个worker线程是cpu核心数1 与32进行比较 取小的那个 也就是最大不超过32标准的reactor模式这里服务已经启动起来了。 protected void doOpen() throws Throwable {bootstrap new ServerBootstrap();bossGroup new NioEventLoopGroup(1, new DefaultThreadFactory(NettyServerBoss, true));//iothreads 默认是cpu核心数1 与32进行比较 取小的那个 也就是最大不超过32workerGroup new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory(NettyServerWorker, true));final NettyServerHandler nettyServerHandler new NettyServerHandler(getUrl(), this);channels nettyServerHandler.getChannels();/*** ChannelOption.SO_REUSEADDR 这个参数表示允许重复使用本地地址和端口** 比如某个服务器进程占用了TCP的80端口进行监听此时再次监听该端口就会返回错误使用该参数就可以解决问题该参数允许共用该端口这个在服务器程序中比较常使用** 比如某个进程非正常退出该程序占用的端口可能要被占用一段时间才能允许其他进程使用而且程序死掉以后内核一需要一定的时间才能够释放此端口不设置SO_REUSEADDR** 就无法正常使用该端口。*/bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {NettyCodecAdapter adapter new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ch.pipeline()//.addLast(logging,new LoggingHandler(LogLevel.INFO))//for debug.addLast(decoder, adapter.getDecoder()).addLast(encoder, adapter.getEncoder()).addLast(handler, nettyServerHandler);}});// bindChannelFuture channelFuture bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel channelFuture.channel();}2.2.6.4.6、RegistryProtocol#getRegistry 首先从originInvoker里面获取registryUrl然后通过urll获取registryFactory对象这个是SPI机制这里因为我们指定注册中心是zk所以这里拿到的是ZookeeperRegistryFactory private Registry getRegistry(final Invoker? originInvoker) {//然后接着就是从originInvoker里面获取registryUrl接着就是获取Registry的对象。我们看下这个getRegistry()方法URL registryUrl getRegistryUrl(originInvoker);//通过registryFactory 获取Registry其实这个registryFactory 是RegistryProtocol的一个成员然后是dubbo spi 自动setter注入进来的。//我们来看一下registryFactory接口源代码return registryFactory.getRegistry(registryUrl);}SPI(dubbo)
public interface RegistryFactory {//我们可以看到Adaptive的值是protocol咱们上面的url中的protocol值正好是zookeeper就可以找到ZookeeperRegistryFactory我们看下ZookeeperRegistryFactory继承图Adaptive({protocol})Registry getRegistry(URL url);}2.2.6.4.6、ProviderConsumerRegTable#registerProvider 这里面就是维护了两个map缓存了服务提供者和消费者就是把这几个参数封装成ProviderInvokerWrapper对象这个对象就是包装的作用将这几个参数关联起来了然后就是获取serviceKey这个serviceKey就是暴露接口的全类名然后就是根据serviceKey从providerInvokers 这个map中获取invokers然后没有就创建最后就是将包装的这个invoker缓存到invokers中了。 public class ProviderConsumerRegTable {//服务提供者public static ConcurrentHashMapString, SetProviderInvokerWrapper providerInvokers new ConcurrentHashMapString, SetProviderInvokerWrapper();//服务消费者public static ConcurrentHashMapString, SetConsumerInvokerWrapper consumerInvokers new ConcurrentHashMapString, SetConsumerInvokerWrapper();public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {//服务提供者包装类ProviderInvokerWrapper wrapperInvoker new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);//唯一的keyString serviceUniqueName providerUrl.getServiceKey();SetProviderInvokerWrapper invokers providerInvokers.get(serviceUniqueName);if (invokers null) { //没有就创建设置providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSetProviderInvokerWrapper());invokers providerInvokers.get(serviceUniqueName);}invokers.add(wrapperInvoker);}
}2.2.6.4.7、ProviderConsumerRegTable#registerProvider 调用ZookeeperRegisty#register注册到注册中上 public void register(URL registryUrl, URL registedProviderUrl) {//这里直接调用ZookeeperRegistry#register 方法Registry registry registryFactory.getRegistry(registryUrl);registry.register(registedProviderUrl);}2.2.6.4.7、Registry#subscribe 向注册中心订阅这个url这里具体的服务注册订阅的逻辑后续单独分析。