当前位置: 首页 > news >正文

襄阳网站建设知名品牌珠海网站建设品牌策划

襄阳网站建设知名品牌,珠海网站建设品牌策划,广告联盟平台自动赚钱,平面设计网站排行榜Nacos核心功能点 服务注册 Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务#xff0c;提供自身的元数据#xff0c;比如ip地址、端口等信息。Nacos Server接收到注册请求后#xff0c;就会把这些元数据信息存储在一个双层的内存Map中#xff1b; 服…Nacos核心功能点 服务注册 Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务提供自身的元数据比如ip地址、端口等信息。Nacos Server接收到注册请求后就会把这些元数据信息存储在一个双层的内存Map中 服务心跳 在服务注册后Nacos Client会维护一个定时心跳来持续通知Nacos Server说明服务一直处于可用状态防止被剔除。默认5s发送一次心跳 服务健康检查 Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false(客户端服务发现时不会发现)如果某个实例超过30秒没有收到心跳直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册) 服务发现 服务消费者Nacos Client在调用服务提供者的服务时会发送一个REST请求给Nacos Server获取上面注册的服务清单并且缓存在Nacos Client本地同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存 服务同步 Nacos Server集群之间会互相同步服务实例用来保证服务信息的一致性 Nacos服务端原理 Nacos客户端原理 其实从以上的两张图中就能够找到突破口了其实核心内容就集中在nacos-console、nacos-naming、nacos-config这几个模块中。那么知道了这些现在就来进行Nacos的源码下载然后具体分析。 Nacos源码下载及启动 Nacos的源码在GitHub上https://github.com/alibaba/nacos我这里下载的是2.0.3 下载完之后用IDEA打开这玩意儿挺大 加载挺要一段时间。在等待代码加载的过程中简单看下整个nacos的代码模块分的还是挺多各个模块分工明确 而主程序的启动是在console模块中的SpringBoot主启动类启动运行 所以本质上nacos也就是个普通的SpringBoot项目和平时在公司加班写的那些鬼代码一样没什么区别建一个SpringBoot项目用Java语言去变成仅此而已。不用想的过度的复杂也不用去神话它就当是刚刚入职去熟悉公司的项目好了。  需要注意的是源码启动之前需要进行参数的设置 1、需要设置为单机模式 2、配置home目录用于存放配置、数据、日志等-Dnacos.standalonetrue -Dnacos.homeE:\MyNuts\MCA\code\nacos_home 配置完成后直接启动即可随着控制台输出启动成功的日志就可以直接访问本地Nacos服务 D:\Java8\JDK\bin\java.exe -Dnacos.standalonetrue -Dnacos.homeE:\MyNuts\MCA\code\nacos_home -XX:TieredStopAtLevel1 -noverify -Dspring.output.ansi.enabledalways -Dcom.sun.management.jmxremote -Dspring.jmx.enabledtrue -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabledtrue -Dmanagement.endpoints.jmx.exposure.include* -javaagent:D:\IntelliJ IDEA\lib\idea_rt.jar6183:D:\IntelliJ IDEA\bin -Dfile.encodingUTF-8 -classpath D:\Java8\JDK\jre\lib\charsets.jar;D:\Java8\JDK\jre\lib\deploy.jar;D:\Java8\JDK\jre\lib\ext\access-bridge-64.jar;D:\Java8\JDK\jre\lib\ext\cldrdata.jar;D:\Java8\JDK\jre\lib\ext\dnsns.jar;D:\Java8\JDK\jre\lib\ext\jaccess.jar;D:\Java8\JDK\jre\lib\ext\jfxrt.jar;D:\Java8\JDK\jre\lib\ext\localedata.jar;D:\Java8\JDK\jre\lib\ext\nashorn.jar;D:\Java8\JDK\jre\lib\ext\sunec.jar;D:\Java8\JDK\jre\lib\ext\sunjce_provider.jar;D:\Java8\JDK\jre\lib\ext\sunmscapi.jar;D:\Java8\JDK\jre\lib\ext\sunpkcs11.jar;D:\Java8\JDK\jre\lib\ext\zipfs.jar;D:\Java8\JDK\jre\lib\javaws.jar;D:\Java8\JDK\jre\lib\jce.jar;D:\Java8\JDK\jre\lib\jfr.jar;D:\Java8\JDK\jre\lib\jfxswt.jar;D:\Java8\JDK\jre\lib\jsse.jar;D:\Java8\JDK\jre\lib\management-agent.jar;D:\Java8\JDK\jre\lib\plugin.jar;D:\Java8\JDK\jre\lib\resources.jar;D:\Java8\JDK\jre\lib\rt.jar;E:\MyNuts\MCA\code\nacos-2.0.3\console\target\classes;E:\MyNuts\MCA\code\nacos-2.0.3\config\target\classes;D:\maven_repository\org\springframework\boot\spring-boot-starter-web\2.1.17.RELEASE\spring-boot-starter-web-2.1.17.RELEASE.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter-json\2.1.17.RELEASE\spring-boot-starter-json-2.1.17.RELEASE.jar;D:\maven_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.10\jackson-datatype-jdk8-2.9.10.jar;D:\maven_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.10\jackson-datatype-jsr310-2.9.10.jar;D:\maven_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.10\jackson-module-parameter-names-2.9.10.jar;D:\maven_repository\org\hibernate\validator\hibernate-validator\6.0.20.Final\hibernate-validator-6.0.20.Final.jar;D:\maven_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;D:\maven_repository\org\jboss\logging\jboss-logging\3.3.3.Final\jboss-logging-3.3.3.Final.jar;D:\maven_repository\com\fasterxml\classmate\1.4.0\classmate-1.4.0.jar;D:\maven_repository\org\springframework\spring-web\5.1.18.RELEASE\spring-web-5.1.18.RELEASE.jar;D:\maven_repository\org\springframework\spring-webmvc\5.1.18.RELEASE\spring-webmvc-5.1.18.RELEASE.jar;E:\MyNuts\MCA\code\nacos-2.0.3\api\target\classes;D:\maven_repository\org\reflections\reflections\0.9.11\reflections-0.9.11.jar;D:\maven_repository\org\javassist\javassist\3.21.0-GA\javassist-3.21.0-GA.jar;D:\maven_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\MyNuts\MCA\code\nacos-2.0.3\core\target\classes;E:\MyNuts\MCA\code\nacos-2.0.3\common\target\classes;E:\MyNuts\MCA\code\nacos-2.0.3\consistency\target\classes;D:\maven_repository\com\caucho\hessian\4.0.63\hessian-4.0.63.jar;E:\MyNuts\MCA\code\nacos-2.0.3\auth\target\classes;E:\MyNuts\MCA\code\nacos-2.0.3\sys\target\classes;D:\maven_repository\io\jsonwebtoken\jjwt-api\0.11.2\jjwt-api-0.11.2.jar;D:\maven_repository\io\jsonwebtoken\jjwt-impl\0.11.2\jjwt-impl-0.11.2.jar;D:\maven_repository\io\jsonwebtoken\jjwt-jackson\0.11.2\jjwt-jackson-0.11.2.jar;D:\maven_repository\org\springframework\spring-test\5.1.18.RELEASE\spring-test-5.1.18.RELEASE.jar;D:\maven_repository\org\springframework\boot\spring-boot-test\2.1.17.RELEASE\spring-boot-test-2.1.17.RELEASE.jar;D:\maven_repository\com\alipay\sofa\jraft-core\1.3.5\jraft-core-1.3.5.jar;D:\maven_repository\org\ow2\asm\asm\6.0\asm-6.0.jar;D:\maven_repository\org\rocksdb\rocksdbjni\5.18.4\rocksdbjni-5.18.4.jar;D:\maven_repository\net\java\dev\jna\jna\4.5.2\jna-4.5.2.jar;D:\maven_repository\org\jctools\jctools-core\2.1.1\jctools-core-2.1.1.jar;D:\maven_repository\com\lmax\disruptor\3.3.7\disruptor-3.3.7.jar;D:\maven_repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;D:\maven_repository\com\alipay\sofa\hessian\3.3.6\hessian-3.3.6.jar;D:\maven_repository\io\dropwizard\metrics\metrics-core\4.0.7\metrics-core-4.0.7.jar;D:\maven_repository\com\alipay\sofa\rpc-grpc-impl\1.3.5\rpc-grpc-impl-1.3.5.jar;D:\maven_repository\com\google\guava\guava\30.1-jre\guava-30.1-jre.jar;D:\maven_repository\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\maven_repository\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\maven_repository\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\maven_repository\org\checkerframework\checker-qual\3.5.0\checker-qual-3.5.0.jar;D:\maven_repository\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter-jdbc\2.1.17.RELEASE\spring-boot-starter-jdbc-2.1.17.RELEASE.jar;D:\maven_repository\com\zaxxer\HikariCP\3.4.2\HikariCP-3.4.2.jar;D:\maven_repository\org\springframework\spring-jdbc\5.1.18.RELEASE\spring-jdbc-5.1.18.RELEASE.jar;D:\maven_repository\org\springframework\spring-tx\5.1.18.RELEASE\spring-tx-5.1.18.RELEASE.jar;D:\maven_repository\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\maven_repository\mysql\mysql-connector-java\8.0.21\mysql-connector-java-8.0.21.jar;D:\maven_repository\org\apache\derby\derby\10.14.2.0\derby-10.14.2.0.jar;D:\maven_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\maven_repository\org\aspectj\aspectjrt\1.9.6\aspectjrt-1.9.6.jar;D:\maven_repository\cglib\cglib-nodep\2.1\cglib-nodep-2.1.jar;D:\maven_repository\org\apache\httpcomponents\httpasyncclient\4.1.3\httpasyncclient-4.1.3.jar;D:\maven_repository\org\apache\httpcomponents\httpcore-nio\4.4.13\httpcore-nio-4.4.13.jar;D:\maven_repository\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter-tomcat\2.1.17.RELEASE\spring-boot-starter-tomcat-2.1.17.RELEASE.jar;D:\maven_repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.38\tomcat-embed-websocket-9.0.38.jar;D:\maven_repository\com\fasterxml\jackson\core\jackson-core\2.12.2\jackson-core-2.12.2.jar;D:\maven_repository\com\fasterxml\jackson\core\jackson-databind\2.12.2\jackson-databind-2.12.2.jar;D:\maven_repository\com\fasterxml\jackson\core\jackson-annotations\2.12.2\jackson-annotations-2.12.2.jar;D:\maven_repository\io\micrometer\micrometer-registry-prometheus\1.1.18\micrometer-registry-prometheus-1.1.18.jar;D:\maven_repository\io\micrometer\micrometer-core\1.1.18\micrometer-core-1.1.18.jar;D:\maven_repository\org\hdrhistogram\HdrHistogram\2.1.9\HdrHistogram-2.1.9.jar;D:\maven_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;D:\maven_repository\io\prometheus\simpleclient_common\0.5.0\simpleclient_common-0.5.0.jar;D:\maven_repository\io\micrometer\micrometer-registry-influx\1.1.18\micrometer-registry-influx-1.1.18.jar;D:\maven_repository\io\micrometer\micrometer-registry-elastic\1.1.18\micrometer-registry-elastic-1.1.18.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter-aop\2.1.17.RELEASE\spring-boot-starter-aop-2.1.17.RELEASE.jar;D:\maven_repository\org\aspectj\aspectjweaver\1.9.6\aspectjweaver-1.9.6.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter-actuator\2.1.17.RELEASE\spring-boot-starter-actuator-2.1.17.RELEASE.jar;D:\maven_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.1.17.RELEASE\spring-boot-actuator-autoconfigure-2.1.17.RELEASE.jar;D:\maven_repository\org\springframework\boot\spring-boot-actuator\2.1.17.RELEASE\spring-boot-actuator-2.1.17.RELEASE.jar;D:\maven_repository\org\yaml\snakeyaml\1.23\snakeyaml-1.23.jar;D:\maven_repository\org\apache\tomcat\embed\tomcat-embed-jasper\9.0.40\tomcat-embed-jasper-9.0.40.jar;D:\maven_repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.38\tomcat-embed-core-9.0.38.jar;D:\maven_repository\org\apache\tomcat\tomcat-annotations-api\9.0.38\tomcat-annotations-api-9.0.38.jar;D:\maven_repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.38\tomcat-embed-el-9.0.38.jar;D:\maven_repository\org\eclipse\jdt\ecj\3.18.0\ecj-3.18.0.jar;E:\MyNuts\MCA\code\nacos-2.0.3\naming\target\classes;D:\maven_repository\io\netty\netty-all\4.1.59.Final\netty-all-4.1.59.Final.jar;D:\maven_repository\org\springframework\boot\spring-boot\2.1.17.RELEASE\spring-boot-2.1.17.RELEASE.jar;D:\maven_repository\org\springframework\spring-core\5.1.18.RELEASE\spring-core-5.1.18.RELEASE.jar;D:\maven_repository\org\springframework\spring-jcl\5.1.18.RELEASE\spring-jcl-5.1.18.RELEASE.jar;D:\maven_repository\org\springframework\spring-context\5.1.18.RELEASE\spring-context-5.1.18.RELEASE.jar;D:\maven_repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\maven_repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;D:\maven_repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;D:\maven_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\maven_repository\org\apache\mina\mina-core\2.0.0-RC1\mina-core-2.0.0-RC1.jar;D:\maven_repository\org\javatuples\javatuples\1.2\javatuples-1.2.jar;D:\maven_repository\org\apache\httpcomponents\httpcore\4.4.13\httpcore-4.4.13.jar;D:\maven_repository\org\apache\httpcomponents\httpclient\4.5.12\httpclient-4.5.12.jar;D:\maven_repository\commons-codec\commons-codec\1.11\commons-codec-1.11.jar;E:\MyNuts\MCA\code\nacos-2.0.3\cmdb\target\classes;E:\MyNuts\MCA\code\nacos-2.0.3\istio\target\classes;E:\MyNuts\MCA\code\nacos-2.0.3\client\target\classes;D:\maven_repository\io\prometheus\simpleclient\0.5.0\simpleclient-0.5.0.jar;D:\maven_repository\io\grpc\grpc-netty-shaded\1.24.0\grpc-netty-shaded-1.24.0.jar;D:\maven_repository\io\grpc\grpc-core\1.24.0\grpc-core-1.24.0.jar;D:\maven_repository\com\google\code\gson\gson\2.8.6\gson-2.8.6.jar;D:\maven_repository\com\google\android\annotations\4.1.1.4\annotations-4.1.1.4.jar;D:\maven_repository\io\perfmark\perfmark-api\0.17.0\perfmark-api-0.17.0.jar;D:\maven_repository\io\opencensus\opencensus-api\0.21.0\opencensus-api-0.21.0.jar;D:\maven_repository\io\opencensus\opencensus-contrib-grpc-metrics\0.21.0\opencensus-contrib-grpc-metrics-0.21.0.jar;D:\maven_repository\io\grpc\grpc-protobuf\1.24.0\grpc-protobuf-1.24.0.jar;D:\maven_repository\io\grpc\grpc-api\1.24.0\grpc-api-1.24.0.jar;D:\maven_repository\io\grpc\grpc-context\1.24.0\grpc-context-1.24.0.jar;D:\maven_repository\com\google\errorprone\error_prone_annotations\2.3.2\error_prone_annotations-2.3.2.jar;D:\maven_repository\org\codehaus\mojo\animal-sniffer-annotations\1.17\animal-sniffer-annotations-1.17.jar;D:\maven_repository\io\grpc\grpc-protobuf-lite\1.24.0\grpc-protobuf-lite-1.24.0.jar;D:\maven_repository\io\grpc\grpc-stub\1.24.0\grpc-stub-1.24.0.jar;D:\maven_repository\com\google\api\grpc\proto-google-common-protos\1.17.0\proto-google-common-protos-1.17.0.jar;D:\maven_repository\com\google\protobuf\protobuf-java\3.8.0\protobuf-java-3.8.0.jar;D:\maven_repository\io\envoyproxy\controlplane\api\0.1.27\api-0.1.27.jar;D:\maven_repository\org\slf4j\log4j-over-slf4j\1.7.30\log4j-over-slf4j-1.7.30.jar;D:\maven_repository\org\slf4j\jcl-over-slf4j\1.7.30\jcl-over-slf4j-1.7.30.jar;D:\maven_repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter-security\2.1.17.RELEASE\spring-boot-starter-security-2.1.17.RELEASE.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter\2.1.17.RELEASE\spring-boot-starter-2.1.17.RELEASE.jar;D:\maven_repository\org\springframework\boot\spring-boot-autoconfigure\2.1.17.RELEASE\spring-boot-autoconfigure-2.1.17.RELEASE.jar;D:\maven_repository\org\springframework\boot\spring-boot-starter-logging\2.1.17.RELEASE\spring-boot-starter-logging-2.1.17.RELEASE.jar;D:\maven_repository\org\apache\logging\log4j\log4j-to-slf4j\2.11.2\log4j-to-slf4j-2.11.2.jar;D:\maven_repository\org\apache\logging\log4j\log4j-api\2.13.3\log4j-api-2.13.3.jar;D:\maven_repository\org\springframework\spring-aop\5.1.18.RELEASE\spring-aop-5.1.18.RELEASE.jar;D:\maven_repository\org\springframework\spring-beans\5.1.18.RELEASE\spring-beans-5.1.18.RELEASE.jar;D:\maven_repository\org\springframework\security\spring-security-config\5.1.12.RELEASE\spring-security-config-5.1.12.RELEASE.jar;D:\maven_repository\org\springframework\security\spring-security-core\5.1.12.RELEASE\spring-security-core-5.1.12.RELEASE.jar;D:\maven_repository\org\springframework\security\spring-security-web\5.1.12.RELEASE\spring-security-web-5.1.12.RELEASE.jar;D:\maven_repository\org\springframework\spring-expression\5.1.18.RELEASE\spring-expression-5.1.18.RELEASE.jar com.alibaba.nacos.Nacos,--.,--.|,--,: : | Nacos ,--.| : ,---. Running in stand alone mode, All function modules | : : | | ,\ .--.--. Port: 8848 : | \ | : ,--.--. ,---. / / | / / Pid: 4744 | : ; | / \ / \. ; ,. :| : /./ Console: http://192.168.160.1:8848/nacos/index.html;. ;.--. .-. | / / | |: :| : ;_ | | | \ | \__\/: . .. / | .; : \ \ . https://nacos.io: | ; . , .--.; | ; :__| : | ----. \ | | -- / / ,. | | .|\ \ / / /-- /: | ; : . \ : : ---- --. / ; |. | , .-./\ \ / ----- --- ----- ----2023-02-16 09:21:39.127 INFO 4744 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean org.springframework.security.access.expression.method.DefaultMethodSecurityExpressionHandler25a5c7db of type [org.springframework.security.access.expression.method.DefaultMethodSecurityExpressionHandler] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2023-02-16 09:21:39.135 INFO 4744 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean methodSecurityMetadataSource of type [org.springframework.security.access.method.DelegatingMethodSecurityMetadataSource] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2023-02-16 09:21:39.495 INFO 4744 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8848 (http) 2023-02-16 09:21:40.348 INFO 4744 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 14339 ms 2023-02-16 09:21:49.646 INFO 4744 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService applicationTaskExecutor 2023-02-16 09:21:50.030 INFO 4744 --- [ main] o.s.b.a.w.s.WelcomePageHandlerMapping : Adding welcome page: class path resource [static/index.html] 2023-02-16 09:21:51.328 INFO 4744 --- [ main] o.s.s.web.DefaultSecurityFilterChain : Creating filter chain: Ant [pattern/**], [] 2023-02-16 09:21:51.676 INFO 4744 --- [ main] o.s.s.web.DefaultSecurityFilterChain : Creating filter chain: any request, [org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter92e2c93, org.springframework.security.web.context.SecurityContextPersistenceFilter5d5574c7, org.springframework.security.web.header.HeaderWriterFilter65f5cae3, org.springframework.security.web.csrf.CsrfFilter1029cf9, org.springframework.security.web.authentication.logout.LogoutFiltere0847a9, org.springframework.security.web.savedrequest.RequestCacheAwareFilter36871e98, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter23df7fad, org.springframework.security.web.authentication.AnonymousAuthenticationFilter6697f3d, org.springframework.security.web.session.SessionManagementFilter25d9291a, org.springframework.security.web.access.ExceptionTranslationFilter698ac187] 2023-02-16 09:21:51.909 INFO 4744 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService taskScheduler 2023-02-16 09:21:51.975 INFO 4744 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 16 endpoint(s) beneath base path /actuator 2023-02-16 09:21:52.262 INFO 4744 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8848 (http) with context path /nacos 2023-02-16 09:21:52.272 INFO 4744 --- [ main] c.a.n.c.l.StartingApplicationListener : Nacos started successfully in stand alone mode. use embedded storage能正常访问就说明源码启动没有任何问题那么下面开始正式的源码分析。 Nacos客户端服务注册 在《【手把手】教你玩转SpringCloud Alibaba之Nacos》这篇文章中详细说明了nacos的使用建议还没有看过的小伙伴如果还没有使用过nacos的话先去看一下这一篇对nacos的使用有个基础的认知再来看相关源码的讲解https://blog.csdn.net/FeenixOne/article/details/126953198只要引入nacos客户端的依赖就可以进行自动注册到nacos的服务端这中间发生了什么事情对于使用者来说是不需要知道的。不过既然想要研究源码就是要搞清楚其中做了什么事情。为了更简单明了的说明白这个过程这里使用nacos自身提供的一个测试类来举栗。 /** Copyright 1999-2018 Alibaba Group Holding Ltd.** Licensed under the Apache License, Version 2.0 (the License);* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.alibaba.nacos.client;import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.common.utils.ThreadUtils; import org.junit.Ignore; import org.junit.Test;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;Ignore public class NamingTest {Testpublic void testServiceList() throws Exception {Properties properties new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, 127.0.0.1:8848);properties.put(PropertyKeyConst.USERNAME, nacos);properties.put(PropertyKeyConst.PASSWORD, nacos);Instance instance new Instance();instance.setIp(1.1.1.1);instance.setPort(800);instance.setWeight(2);MapString, String map new HashMapString, String();map.put(netType, external);map.put(version, 2.0);instance.setMetadata(map);NamingService namingService NacosFactory.createNamingService(properties);namingService.registerInstance(nacos.test.1, instance);ThreadUtils.sleep(5000L);ListInstance list namingService.getAllInstances(nacos.test.1);System.out.println(list);ThreadUtils.sleep(30000L);// ExpressionSelector expressionSelector new ExpressionSelector();// expressionSelector.setExpression(INSTANCE.metadata.registerSource dubbo);// ListViewString serviceList namingService.getServicesOfServer(1, 10, expressionSelector);} }其实这就是客户端注册的一个测试类它模仿了一个真实的服务注册进Nacos的过程包括NacosServer连接、实例的创建、实例属性的赋值、注册实例.所以在这个其中包含了服务注册的核心代码仅从此处的代码分析可以看出Nacos注册服务实例时包含了两大类信息Nacos Server连接信息和实例信息。别看代码没几行但是这个过程是非常清楚明了的。 连接信息 Properties类是nacos server的连接信息包含以下信息 1、Server地址Nacos服务器地址属性的key为serverAddr 2、用户名连接Nacos服务的用户名属性key为username默认值为nacos 3、密码连接Nacos服务的密码属性key为password默认值为nacos Instance类是nacos的实例信息所谓实例指的就是每一个注册的客户端都视为一个实例实例信息又分两部分实例基础信息和元数据。实例基础信息包括 - instanceId实例的唯一ID - ip实例IP提供给消费者进行通信的地址 - port 端口提供给消费者访问的端口 - weight权重当前实例的权限浮点类型默认1.0D - healthy健康状况默认true - enabled实例是否准备好接收请求默认true - ephemeral实例是否为瞬时的所谓瞬时指的就是在内存中没有持久化到硬盘上默认为true - clusterName实例所属的集群名称 - serviceName实例的服务信息 实例信息 Instance类包含了实例的基础信息之外还包含了用于存储元数据的metadata描述数据的数据类型为HashMap从当前这个Demo中可以得知存放了两个数据 - netType顾名思义网络类型这里的值为external也就是外网的意思 - version版本Nacos的版本这里是2.0这个大版本 除了Demo中这些“自定义”的信息在Instance类中还定义了一些默认信息这些信息通过Instance类里面的get方法提供点进去Instance类从源码可以看到 public long getInstanceHeartBeatInterval() {return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,Constants.DEFAULT_HEART_BEAT_INTERVAL); }public long getInstanceHeartBeatTimeOut() {return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_TIMEOUT,Constants.DEFAULT_HEART_BEAT_TIMEOUT); }public long getIpDeleteTimeout() {return getMetaDataByKeyWithDefault(PreservedMetadataKeys.IP_DELETE_TIMEOUT,Constants.DEFAULT_IP_DELETE_TIMEOUT); }public String getInstanceIdGenerator() {return getMetaDataByKeyWithDefault(PreservedMetadataKeys.INSTANCE_ID_GENERATOR,Constants.DEFAULT_INSTANCE_ID_GENERATOR); } 上面的get方法在需要元数据默认值时会被用到 - preserved.heart.beat.interval心跳间隙的key默认为5s也就是默认5秒进行一次心跳 - preserved.heart.beat.timeout心跳超时的key默认为15s也就是默认15秒收不到心跳实例将会标记为不健康 - preserved.ip.delete.timeout实例IP被删除的key默认为30s也就是30秒收不到心跳实例将会被移除 - preserved.instance.id.generator实例ID生成器key默认为simple 这些都是Nacos默认提供的值也就是当前实例注册时会告诉Nacos Server说我的心跳间隙、心跳超时等对应的值是多少就按照这个值来判断我这个实例是否健康。有了这些信息基本是已经知道注册实例时需要传递什么参数需要配置什么参数了。 NamingService接口与NacosNamingService实现 nacos在注册的时候使用的一个核心接口只要重构这些方法就可以进行服务的注册。也就是说完全可以通过实现它的接口来写一个自己的nacos都可以。NamingService接口是Nacos命名服务对外提供的一个统一接口看对应的源码就可以发现它提供了大量实例相关的接口方法 服务实例注册void registerInstance(...) throws NacosException; 服务实例注销void deregisterInstance(...) throws NacosException; 获取服务实例列表ListInstance getAllInstances(...) throws NacosException; 查询健康服务实例ListInstance selectInstances(...) throws NacosException; 查询集群中健康的服务实例ListInstance selectInstances(....ListString clusters....)throws NacosException; 使用负载均衡策略选择一个健康的服务实例Instance selectOneHealthyInstance(...) throws NacosException; 订阅服务事件void subscribe(...) throws NacosException; 取消订阅服务事件void unsubscribe(...) throws NacosException; 获取所有或指定服务名称ListViewString getServicesOfServer(...) throws NacosException; 获取所有订阅的服务ListServiceInfo getSubscribeServices() throws NacosException; 获取Nacos服务的状态String getServerStatus(); 主动关闭服务void shutDown() throws NacosException 在这些方法中提供了大量的重载方法应用于不同场景和不同类型实例或服务的筛选 NamingService的实例化是通过NamingFactory类和上面的Nacos服务信息从代码中可以看出这里采用了反射机制来实例化NamingService具体的实现类为NacosNamingService public static NamingService createNamingService(Properties properties) throws NacosException {try {Class? driverImplClass Class.forName(com.alibaba.nacos.client.naming.NacosNamingService);Constructor constructor driverImplClass.getConstructor(Properties.class);return (NamingService) constructor.newInstance(properties);} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);} } 在测试类中使用了NamingService的registerInstance方法来进行服务实例的注册该方法接收两个参数服务名称和实例对象。这个方法的最大作用是设置了当前实例的分组信息。我们知道在Nacos中通过Namespace、group、Service、Cluster等一层层的将实例进行环境的隔离。在这里设置了默认的分组为“DEFAULT_GROUP”。 Override public void registerInstance(String serviceName, Instance instance) throws NacosException {registerInstance(serviceName, Constants.DEFAULT_GROUP, instance); } 紧接着调用的registerInstance方法如下这个方法实现了两个功能 1、检查心跳时间设置的对不对心跳默认为5秒 2、通过NamingClientProxy这个代理来执行服务注操作 Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//检查心跳clientProxy.registerService(serviceName, groupName, instance);//通过代理执行服务注册操作 } 通过clientProxy发现NamingClientProxy这个代理接口的具体实现是由NamingClientProxyDelegate来完成的这个可以从NacosNamingService构造方法中来看出。 public NacosNamingService(Properties properties) throws NacosException {init(properties); } 初始化在init方法中 private void init(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);this.namespace InitUtils.initNamespaceForNaming(properties);InitUtils.initSerialization();InitUtils.initWebRootContext(properties);initLogName(properties);this.changeNotifier new InstancesChangeNotifier();NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);NotifyCenter.registerSubscriber(changeNotifier);this.serviceInfoHolder new ServiceInfoHolder(namespace, properties);this.clientProxy new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);//在这里进行了初始化并看出使用的是NamingClientProxyDelegate来完成的 } 根据上面的分析和源码的阅读可以发现NamingClientProxy调用registerService实际上调用的就是NamingClientProxyDelegate的对应方法 Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance); } 真正调用注册服务的并不是代理实现类而是根据当前实例是否为瞬时对象来选择对应的客户端代理来进行请求的如果当前实例为瞬时对象则采用gRPC协议NamingGrpcClientProxy进行请求否则采用http协议NamingHttpClientProxy进行请求。默认为瞬时对象也就是说2.0版本中默认采用了gRPC协议进行与Nacos服务进行交互。 private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy; } 关于gRPC协议NamingGrpcClientProxy后面再做展开先主要关注一下registerService方法实现这里其实做了两件事情 1、缓存当前注册的实例信息用于恢复缓存的数据结构为ConcurrentMapString, Instancekey为“serviceNamegroupName”value就是前面封装的实例信息 2、另外一件事就是封装了参数基于gRPC进行服务的调用和结果的处理 Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info([REGISTER-SERVICE] {} registering service {} with instance {}, namespaceId, serviceName,instance);redoService.cacheInstanceForRedo(serviceName, groupName, instance);//缓存数据doRegisterService(serviceName, groupName, instance);//基于gRPC进行服务的调用 } 以上的总结下来就是 实际上我们在真实的生产环境中我们要让某一个服务注册到Nacos中我们首先要引入一个依赖到SpringBoot自动装配文件META-INF/spring.factories文件 然后再通过SpingBoot的自动装配来加载EnableAutoConfiguration对应的类然后这里就能看见很多Nacos相关的内容那怎么能知道这个服务在注册的时候具体走的时候哪一个其实一般这种文件都会找“Auto”关键子的文件来进行查看然后现在要了解的是客户端的注册所以要找“NacosServiceRegistryAutoConfiguration”。 在当前这个类中会有很多的Bean组件这些都是Spring容器启动时候自动注入的一般情况下可能会看一下每一个Bean组件初始化具体干了什么但是实际上这里最核心的是“NacosAutoServiceRegistration”   其实这个类就是注册的核心来看一下它的继承关系   通过这里可以清楚的知道NacosAutoServiceRegistration继承了AbstractAutoServiceRegistration而这个类型实现了ApplicationListener接口所以由此得出一般实现ApplicationListener接口的类型都会实现一个方法onApplicationEvent这个方法会在项目启动的时候触发 public void onApplicationEvent(WebServerInitializedEvent event) {bind(event); }Deprecated public void bind(WebServerInitializedEvent event) {ApplicationContext context event.getApplicationContext();if (context instanceof ConfigurableWebServerApplicationContext) {if (management.equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {return;}}this.port.compareAndSet(0, event.getWebServer().getPort());this.start(); } 然后在start()方法中调用register()方法来注册服务 public void start() {if (!isEnabled()) {if (logger.isDebugEnabled()) {logger.debug(Discovery Lifecycle disabled. Not starting);}return;}// only initialize if nonSecurePort is greater than 0 and it isnt already running// because of containerPortInitializer belowif (!this.running.get()) {this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));register();if (shouldRegisterManagement()) {registerManagement();}this.context.publishEvent(new InstanceRegisteredEvent(this, getConfiguration()));this.running.compareAndSet(false, true);}} 继续分析一下register这个方法 protected void register() {this.serviceRegistry.register(getRegistration()); } 但是这里要注意serviceRegistry实际上是一个接口所以来看一下它的具体实现类NacosServiceRegistry找到这个实现类然后来查看register方法到这里其实应该已经明白了因为这里调用了上面说过的registerInstance注册实例方法 Override public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn(No service to register for nacos client...);return;}NamingService namingService namingService();String serviceId registration.getServiceId();String group nacosDiscoveryProperties.getGroup();//构建instance实例Instance instance getNacosInstanceFromRegistration(registration);try {//向服务端注册此服务namingService.registerInstance(serviceId, group, instance);log.info(nacos registry, {} {} {}:{} register finished, group, serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {log.error(nacos registry, {} register failed...{},, serviceId,registration.toString(), e);// rethrow a RuntimeException if the registration is failed.// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);} } 其实到这里应该已经明白Nacos客户端的服务注册过程了但是其实再补充一点就是其实注册本身就是访问了Nacos提供的一个接口可以在官网上看到 通过deBug来看一下在NacosServiceRegistry中的register方法中在注册实例方法中打断点   然后在NamingService的实现类NacosNamingService中registerInstance方法中打断点 然后进入到这个registerService方法中进行查看就会发现这里就会把实例信息放到散列表中然后调用reqApi方法来发送请求访问接口/nacos/v1/ns/instance   Nacos服务端服务注册 上面说到客户端在注册服务的时候实际上是调用的NamingService.registerInstance这个方法来完成实例的注册而且在最后也看到实际上从本质上讲服务注册就是调用的对应接口nacos/v1/ns/instance那现在就在服务端先找到这个接口然后来看具体服务端的操作。 这是从Nacos官网上看到的Nacos架构图其实在这里已经就能分析出要找的接口应该在NamingService这个服务中。从源码角度来看其实通过这个项目结构图中也能清楚的看见naming这个子模块而且在前面各模块的功能图中就说到naming实际上就是实现服务的注册的。 那接着来看下这个项目中的controller因为所有的接口其实都在controller中从这些Controller中就会明显的看到一个InstanceController所以很明显注册实例一定和它有关。RequestMapping注解中的值就是访问的注册接口 接下来再寻找RESTful API接口POST请求类型的方法register在这个方法中实际上就是接受用户请求把收到的信息进行解析还原成Instance然后调用registerInstance方法来完成注册这个方法才是服务端注册的核心  CanDistro PostMapping Secured(parser NamingResourceParser.class, action ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception {final String namespaceId WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);final Instance instance HttpRequestInstanceBuilder.newBuilder().setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();//注册服务实例getInstanceOperator().registerInstance(namespaceId, serviceName, instance);return ok; } 其中的getInstanceOperator()就是判断是否采用gRPC协议很明显这个位置走的是instanceServiceV2 private InstanceOperator getInstanceOperator() {return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1; } 实际上instanceServiceV2就是InstanceOperatorClientImpl来看它的registerInstance方法instanceServiceV2.registerInstance Override public void registerInstance(String namespaceId, String serviceName, Instance instance) {//判断是否为瞬时对象临时客户端boolean ephemeral instance.isEphemeral();//获取客户端IDString clientId IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);//通过客户端ID创建客户端连接createIpPortClientIfAbsent(clientId);//获取服务Service service getService(namespaceId, serviceName, ephemeral);//具体注册服务clientOperationService.registerInstance(service, instance, clientId); } 在这里要分析一些细节其实Nacos2.0以后新增Client模型。一个客户端gRPC长连接对应一个Client每个Client有自己唯一的idclientId。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。可以看一下这个模型其实就是一个接口 public interface Client {// 客户端id/gRPC的connectionIdString getClientId();// 是否临时客户端boolean isEphemeral();// 客户端更新时间void setLastUpdatedTime();long getLastUpdatedTime();// 服务实例注册/注销/查询boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);InstancePublishInfo removeServiceInstance(Service service);InstancePublishInfo getInstancePublishInfo(Service service);CollectionService getAllPublishedService();// 服务订阅/取消订阅/查询订阅boolean addServiceSubscriber(Service service, Subscriber subscriber);boolean removeServiceSubscriber(Service service);Subscriber getSubscriber(Service service);CollectionService getAllSubscribeService();// 生成同步给其他节点的client数据ClientSyncData generateSyncData();// 是否过期boolean isExpire(long currentTime);// 释放资源void release(); } EphemeralClientOperationServiceImpl实际负责处理服务注册来看具体方法 Override public void registerInstance(Service service, Instance instance, String clientId) {//确保Service单例存在Service singleton ServiceManager.getInstance().getSingleton(service);//根据客户端id找到客户端Client client clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//客户端Instance模型转换为服务端Instance模型InstancePublishInfo instanceInfo getPublishInfo(instance);//将Instance储存到Client里client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();//建立Service与ClientId的关系NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false)); } Service的容器是ServiceManager但是在com.alibaba.nacos.naming.core.v2包下容器中Service都是单例。 public class ServiceManager {private static final ServiceManager INSTANCE new ServiceManager();//单例Service可以查看Service的equals和hasCode方法private final ConcurrentHashMapService, Service singletonRepository;//namespace下的所有serviceprivate final ConcurrentHashMapString, SetService namespaceSingletonMaps;..... } 从这个位置可以看出当调用这个注册方法的时候ServiceManager负责管理Service单例 //通过Map储存单例的Service public Service getSingleton(Service service) {singletonRepository.putIfAbsent(service, service);Service result singletonRepository.get(service);namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) - new ConcurrentHashSet());namespaceSingletonMaps.get(result.getNamespace()).add(result);return result; } clientManager是一个接口这里要看它对应的一个实现类ConnectionBasedClientManager这个实现类负责管理长连接clientId与Client模型的映射关系 // 根据clientId查询Client public Client getClient(String clientId) {return clients.get(clientId); } Client接口的一个实例AbstractClient负责存储当前客户端的服务注册表即Service与Instance的关系。注意对于单个客户端来说同一个服务只能注册一个实例。 Override public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {if (null publishers.put(service, instancePublishInfo)) {MetricsMonitor.incrementInstanceCount();}NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info(Client change for service {}, {}, service, getClientId());return true; } ClientOperationEvent.ClientRegisterServiceEvent这里的目的是为了过滤目标服务得到最终Instance列表建立Service与Client的关系建立Service与Client的关系就是为了加速查询。 发布ClientRegisterServiceEvent事件ClientServiceIndexesManager监听ClientServiceIndexesManager维护了两个索引 - Service与发布clientId - Service与订阅clientId private final ConcurrentMapService, SetString publisherIndexes new ConcurrentHashMap();private final ConcurrentMapService, SetString subscriberIndexes new ConcurrentHashMap();private void handleClientOperation(ClientOperationEvent event) {Service service event.getService();String clientId event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {addPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {removePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {addSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {removeSubscriberIndexes(service, clientId);} }//建立Service与发布Client的关系 private void addPublisherIndexes(Service service, String clientId) {publisherIndexes.computeIfAbsent(service, (key) - new ConcurrentHashSet());publisherIndexes.get(service).add(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } 这个索引关系建立以后还会触发ServiceChangedEvent代表服务注册表变更。对于注册表变更紧接着还要做两个事情 1、通知订阅客户端 2、Nacos集群数据同步 Nacos服务端健康检查 注册中心客户端2.0之后使用gRPC代替http会与服务端建立长连接但仍然保留了对旧http客户端的支持。长连接指在一个连接上可以连续发送多个数据包在连接保持期间如果没有数据包发送需要双方发链路检测包。 NamingClientProxy接口负责底层通讯调用服务端接口。有三个实现类 1、NamingClientProxyDelegate代理类对所有NacosNamingService中的方法进行代理根据实际情况选择http或gRPC协议请求服务端 2、NamingGrpcClientProxy底层通讯基于gRPC长连接 3、NamingHttpClientProxy底层通讯基于http短连接。使用的都是老代码基本没改原来1.0版本NamingProxy重命名过来的 以客户端服务注册为例NamingClientProxyDelegate代理了registerService方法 private NamingClientProxy clientProxy; // NamingClientProxyDelegate public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);clientProxy.registerService(serviceName, groupName, instance); } NamingClientProxyDelegate会根据instance实例是否是临时节点而选择不同的协议 1、临时instancegRPC 2、持久instancehttp public class NamingClientProxyDelegate implements NamingClientProxy {private final NamingHttpClientProxy httpClientProxy;private final NamingGrpcClientProxy grpcClientProxy;Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);}// 临时节点走grpc长连接持久节点走http短连接private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;} } 在之前的1.x版本中临时实例走Distro协议内存存储客户端向注册中心发送心跳来维持自身healthy状态持久实例走Raft协议持久化存储服务端定时与客户端建立tcp连接做健康检查。但是2.0版本以后持久化实例没有什么变化但是2.0临时实例不在使用心跳而是通过长连接是否存活来判断实例是否健康。 ConnectionManager负责管理所有客户端的长连接。每3s检测所有超过20s没发生过通讯的客户端向客户端发起ClientDetectionRequest探测请求如果客户端在1s内成功响应则检测通过否则执行unregister方法移除Connection。如果客户端持续与服务端通讯服务端是不需要主动探活的。 MapString, Connection connections new ConcurrentHashMapString, Connection(); PostConstruct public void start() {// 启动不健康连接排除功能.RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {Overridepublic void run() {try {int totalCount connections.size();Loggers.REMOTE_DIGEST.info(Connection check task start);MetricsMonitor.getLongConnectionMonitor().set(totalCount);//统计过时20s连接SetMap.EntryString, Connection entries connections.entrySet();int currentSdkClientCount currentSdkClientCount();boolean isLoaderClient loadClient 0;int currentMaxClient isLoaderClient ? loadClient : connectionLimitRule.countLimit;int expelCount currentMaxClient 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);Loggers.REMOTE_DIGEST.info(Total count {}, sdkCount{},clusterCount{}, currentLimit{}, toExpelCount{},totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount),currentMaxClient (isLoaderClient ? (loaderCount) : ), expelCount);ListString expelClient new LinkedList();MapString, AtomicInteger expelForIp new HashMap(16);//1. calculate expel count of ip.for (Map.EntryString, Connection entry : entries) {Connection client entry.getValue();String appName client.getMetaInfo().getAppName();String clientIp client.getMetaInfo().getClientIp();if (client.getMetaInfo().isSdkSource() !expelForIp.containsKey(clientIp)) {//get limit for current ip.int countLimitOfIp connectionLimitRule.getCountLimitOfIp(clientIp);if (countLimitOfIp 0) {int countLimitOfApp connectionLimitRule.getCountLimitOfApp(appName);countLimitOfIp countLimitOfApp 0 ? countLimitOfIp : countLimitOfApp;}if (countLimitOfIp 0) {countLimitOfIp connectionLimitRule.getCountLimitPerClientIpDefault();}if (countLimitOfIp 0 connectionForClientIp.containsKey(clientIp)) {AtomicInteger currentCountIp connectionForClientIp.get(clientIp);if (currentCountIp ! null currentCountIp.get() countLimitOfIp) {expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));}}}}Loggers.REMOTE_DIGEST.info(Check over limit for ip limit rule, over limit ip count{}, expelForIp.size());if (expelForIp.size() 0) {Loggers.REMOTE_DIGEST.info(Over limit ip expel info, {}, expelForIp);}SetString outDatedConnections new HashSet();long now System.currentTimeMillis();//2.get expel connection for ip limit.for (Map.EntryString, Connection entry : entries) {Connection client entry.getValue();String clientIp client.getMetaInfo().getClientIp();AtomicInteger integer expelForIp.get(clientIp);if (integer ! null integer.intValue() 0) {integer.decrementAndGet();expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;} else if (now - client.getMetaInfo().getLastActiveTime() KEEP_ALIVE_TIME) {outDatedConnections.add(client.getMetaInfo().getConnectionId());}}//3. if total count is still over limit.if (expelCount 0) {for (Map.EntryString, Connection entry : entries) {Connection client entry.getValue();if (!expelForIp.containsKey(client.getMetaInfo().clientIp) client.getMetaInfo().isSdkSource() expelCount 0) {expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;outDatedConnections.remove(client.getMetaInfo().getConnectionId());}}}String serverIp null;String serverPort null;if (StringUtils.isNotBlank(redirectAddress) redirectAddress.contains(Constants.COLON)) {String[] split redirectAddress.split(Constants.COLON);serverIp split[0];serverPort split[1];}for (String expelledClientId : expelClient) {try {Connection connection getConnection(expelledClientId);if (connection ! null) {ConnectResetRequest connectResetRequest new ConnectResetRequest();connectResetRequest.setServerIp(serverIp);connectResetRequest.setServerPort(serverPort);connection.asyncRequest(connectResetRequest, null);Loggers.REMOTE_DIGEST.info(Send connection reset request , connection id {},recommendServerIp{}, recommendServerPort{},expelledClientId, connectResetRequest.getServerIp(),connectResetRequest.getServerPort());}} catch (ConnectionAlreadyClosedException e) {unregister(expelledClientId);} catch (Exception e) {Loggers.REMOTE_DIGEST.error(Error occurs when expel connection, expelledClientId:{}, expelledClientId, e);}}//4.client active detection.Loggers.REMOTE_DIGEST.info(Out dated connection ,size{}, outDatedConnections.size());//异步请求所有需要检测的连接if (CollectionUtils.isNotEmpty(outDatedConnections)) {SetString successConnections new HashSet();final CountDownLatch latch new CountDownLatch(outDatedConnections.size());for (String outDateConnectionId : outDatedConnections) {try {Connection connection getConnection(outDateConnectionId);if (connection ! null) {ClientDetectionRequest clientDetectionRequest new ClientDetectionRequest();connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {Overridepublic Executor getExecutor() {return null;}Overridepublic long getTimeout() {return 1000L;}Overridepublic void onResponse(Response response) {latch.countDown();if (response ! null response.isSuccess()) {connection.freshActiveTime();successConnections.add(outDateConnectionId);}}Overridepublic void onException(Throwable e) {latch.countDown();}});Loggers.REMOTE_DIGEST.info([{}]send connection active request , outDateConnectionId);} else {latch.countDown();}} catch (ConnectionAlreadyClosedException e) {latch.countDown();} catch (Exception e) {Loggers.REMOTE_DIGEST.error([{}]Error occurs when check client active detection ,error{},outDateConnectionId, e);latch.countDown();}}latch.await(3000L, TimeUnit.MILLISECONDS);Loggers.REMOTE_DIGEST.info(Out dated connection check successCount{}, successConnections.size());// 对于没有成功响应的客户端执行unregister移出for (String outDateConnectionId : outDatedConnections) {if (!successConnections.contains(outDateConnectionId)) {Loggers.REMOTE_DIGEST.info([{}]Unregister Out dated connection...., outDateConnectionId);unregister(outDateConnectionId);}}}//reset loader clientif (isLoaderClient) {loadClient -1;redirectAddress null;}Loggers.REMOTE_DIGEST.info(Connection check task end);} catch (Throwable e) {Loggers.REMOTE.error(Error occurs during connection check... , e);}}}, 1000L, 3000L, TimeUnit.MILLISECONDS);}//注销移出连接方法 public synchronized void unregister(String connectionId) {Connection remove this.connections.remove(connectionId);if (remove ! null) {String clientIp remove.getMetaInfo().clientIp;AtomicInteger atomicInteger connectionForClientIp.get(clientIp);if (atomicInteger ! null) {int count atomicInteger.decrementAndGet();if (count 0) {connectionForClientIp.remove(clientIp);}}remove.close();Loggers.REMOTE_DIGEST.info([{}]Connection unregistered successfully. , connectionId);clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);} } 移除connection后继承ClientConnectionEventListener的ConnectionBasedClientManager会移除Client发布ClientDisconnectEvent事件。 Override public boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info(Client connection {} disconnect, remove instances and subscribers, clientId);ConnectionBasedClient client clients.remove(clientId);if (null client) {return true;}client.release();NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));return true; } ClientDisconnectEvent会触发几个事件 1、Distro协议同步移除的client数据 2、清除两个索引缓存ClientServiceIndexesManager中Service与发布Client的关系ServiceStorage中Service与Instance的关系 3、服务订阅ClientDisconnectEvent会间接触发ServiceChangedEvent事件将服务变更通知客户端 Nacos客户端服务发现 Nacos客户端的服务发现其实就是封装参数、调用服务接口、获得返回实例列表 但是如果要细化这个流程会发现不仅包括了通过NamingService获取服务列表在获取服务列表的过程中还涉及到通信流程协议Http or gPRC、订阅流程、故障转移流程等。下面来详细的捋一捋。其实这个入口在之前看过就在NamingTest中可以看到 public class NamingTest {Testpublic void testServiceList() throws Exception {......NamingService namingService NacosFactory.createNamingService(properties);namingService.registerInstance(nacos.test.1, instance);ThreadUtils.sleep(5000L);ListInstance list namingService.getAllInstances(nacos.test.1);System.out.println(list);} } 在这里主要要关注getAllInstances方法看一下这个方法的具体操作当然这其中需要经过一系列的重载方法调用。其实这里的方法比入口多出了几个参数这里不仅有服务名称还有分组名、集群列表、是否订阅重载方法中的其他参数已经在各种重载方法的调用过程中设置了默认值比如分组名称默认DEFAULT_GROUOP、集群列表默认为空数组、是否订阅订阅 等。 Override public ListInstance getAllInstances(String serviceName, String groupName, ListString clusters,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;String clusterString StringUtils.join(clusters, ,);// 是否是订阅模式if (subscribe) {// 先从客户端缓存获取服务信息serviceInfo serviceInfoHolder.getServiceInfo(serviceName, , clusterString);if (null serviceInfo) {// 如果本地缓存不存在服务信息则进行订阅serviceInfo clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 如果未订阅服务信息则直接从服务器进行查询serviceInfo clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}// 从服务信息中获取实例列表ListInstance list;if (serviceInfo null || CollectionUtils.isEmpty(list serviceInfo.getHosts())) {return new ArrayListInstance();}return list; } 画成一张流程图就是 这个流程基本逻辑为 1、如果是订阅模式则直接从本地缓存获取服务信息(ServiceInfo)然后从中获取实例列表这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有那说明是首次调用则进行订阅在订阅完成后会获得服务信息 2、如果是非订阅模式那就直接请求服务器端获得服务信息 在刚才的流程中涉及到了订阅的逻辑入口代码为获取实例列表中的方法 serviceInfo clientProxy.subscribe(serviceName, groupName, clusterString); 首先这里的clientProxy是NamingClientProxy类的对象对应的实现类是NamingClientProxyDelegate对应subscribe实现如下 Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup NamingUtils.getGroupedName(serviceName, groupName);String serviceKey ServiceInfo.getKey(serviceNameWithGroup, clusters);// 定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的ServiceInfoServiceInfo result serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null result) {// 如果为null则进行订阅逻辑处理基于gRPC协议result grpcClientProxy.subscribe(serviceName, groupName, clusters);}// ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);return result; } 在这段代码中可以看到在获取服务实例列表时特别是首次也进行了订阅逻辑的拓展基本流程图如下 1、订阅方法先开启定时任务这个定时任务的主要作用就是用来定时同步服务端的实例信息并进行本地缓存更新等操作但是如果是首次这里将会直接返回来走下一步 2、判断本地缓存是否存在如果本地缓存存在ServiceInfo信息则直接返回。如果不存在则默认采用gRPC协议进行订阅并返回ServiceInfo 3、grpcClientProxy的subscribe订阅方法就是直接向服务器发送了一个订阅请求并返回结果 4、最后ServiceInfo本地缓存处理。这里会将获得的最新ServiceInfo与本地内存中的ServiceInfo进行比较更新发布变更时间磁盘文件存储等操作。其实这一步的操作在订阅定时任务中也进行了处理  Nacos客户端服务订阅机制 Nacos的订阅机制如果用一句话来描述就是Nacos客户端通过一个定时任务每6秒从注册中心获取实例列表当发现实例发生变化时发布变更事件订阅者进行业务处理更新实例更改本地缓存。以下是订阅方法的主线流程涉及内容比较多细节比较复杂 其实订阅本质上就是服务发现的一种方式也就是在服务发现的时候执行订阅方法触发定时任务去拉取服务端的数据。NacosNamingService中暴露的许多重载的subscribe重载的目的就是为了少写一些参数这些参数Nacos给默认处理了最终这些重载方法都会调用到下面这个方法 Override public void subscribe(String serviceName, String groupName, ListString clusters, EventListener listener)throws NacosException {if (null listener) {return;}String clusterString StringUtils.join(clusters, ,);changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString); } 来看subscribe方法可能有些眼熟它是clientProxy类型调用的方法实际上就是NamingClientProxyDelegate.subscribe()所以其实这里和之前的服务发现中调用的是一个方法这里其实是在做服务列表的查询所以得出结论查询和订阅都调用了同一个方法 Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup NamingUtils.getGroupedName(serviceName, groupName);String serviceKey ServiceInfo.getKey(serviceNameWithGroup, clusters);// 定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的ServiceInfoServiceInfo result serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null result) {// 如果为null则进行订阅逻辑处理基于gRPC协议result grpcClientProxy.subscribe(serviceName, groupName, clusters);}// ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);return result; } 重点关注这里的任务调度该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask而其中的addTask的实现就是发起了一个定时任务 public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);if (futureMap.get(serviceKey) ! null) {return;}synchronized (futureMap) {if (futureMap.get(serviceKey) ! null) {return;}//构建UpdateTaskScheduledFuture? future addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);} } 定时任务延迟一秒执行 private synchronized ScheduledFuture? addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); } 所以在这里可以得出结论其核心就是调用订阅方法和发起定时任务。 UpdateTask封装了订阅机制的核心业务逻辑来看一下流程图 知道了整体流程以后再来看对应源码 Override public void run() {long delayTime DEFAULT_DELAY;try {// 判断是服务是否订阅和未开启过定时任务如果订阅过直接不在执行if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info(update task is stopped, service:{}, clusters:{}, groupedServiceName, clusters);return;}// 获取缓存的service信息ServiceInfo serviceObj serviceInfoHolder.getServiceInfoMap().get(serviceKey);// 如果为空if (serviceObj null) {// 根据serviceName从注册中心服务端获取Service信息serviceObj namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime serviceObj.getLastRefTime();return;}// 过期服务服务的最新更新时间小于等于缓存刷新最后一次拉取数据的时间时间从注册中心重新查询if (serviceObj.getLastRefTime() lastRefTime) {serviceObj namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新时间lastRefTime serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}// 下次更新缓存时间设置默认6秒// TODO multiple time can be configured.delayTime serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败数量为0(可能会出现失败情况没有ServiceInfo连接失败)resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn([NA] failed to update serviceName: {}, groupedServiceName, e);} finally {// 下次调度刷新时间下次执行的时间与failCount有关failCount0则下次调度时间为6秒最长为1分钟// 即当无异常情况下缓存实例的刷新时间是6秒executor.schedule(this, Math.min(delayTime failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);} } 业务逻辑最后会计算下一次定时任务的执行时间通过delayTime来延迟执行。delayTime默认为 1000L * 6也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时下次执行的时间与失败次数有关但最长不超过1分钟。 总结起来就是 1、订阅方法的调用并进行EventListener的注册后面UpdateTask要用来进行判断 2、通过委托代理类来处理订阅逻辑此处与获取实例列表方法使用了同一个方法 3、通过定时任务执行UpdateTask方法默认执行间隔为6秒当发生异常时会延长但不超过1分钟 4、UpdateTask方法中会比较本地是否存在缓存缓存是否过期。当不存在或过期时查询注册中心获取最新实例更新最后获取时间处理ServiceInfo 5、重新计算定时任务时间循环执行流程 Nacos客户端服务订阅的事件机制 上一节已经分析了Nacos客户端订阅的核心流程Nacos客户端通过一个定时任务每6秒从注册中心获取实例列表当发现实例发生变化时发布变更事件订阅者进行业务处理然后更新内存中和本地的缓存中的实例。这一节就来分析分析定时任务获取到最新实例列表之后整个事件机制是如何处理的。 还是这张图 在第1步调用subscribe方法时会订阅一个EventListener事件。而在定时任务UpdateTask定时获取实例列表之后会调用ServiceInfoHolder.processServiceInfo方法对ServiceInfo进行本地处理这其中就事件的处理。  在subscribe方法中通过了下面的源码进行了监听事件的注册 Override public void subscribe(String serviceName, String groupName, ListString clusters, EventListener listener)throws NacosException {if (null listener) {return;}String clusterString StringUtils.join(clusters, ,);changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString); } 在这其中主要要关注的就是changeNotifier.registerListener此监听就是进行具体事件注册逻辑 public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {String key ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSetEventListener eventListeners listenerMap.get(key);if (eventListeners null) {synchronized (lock) {eventListeners listenerMap.get(key);if (eventListeners null) {eventListeners new ConcurrentHashSetEventListener();//将EventListener缓存到listenerMaplistenerMap.put(key, eventListeners);}}}eventListeners.add(listener); } 可以看出事件的注册便是将EventListener存储在InstancesChangeNotifier的listenerMap属性当中了。同时这里的数据结构为ConcurrentHashMapkey为服务实例的信息的拼接value为监听事件的集合。 上面的源码中已经完成了事件的注册现在就来追溯触发事件的来源UpdateTask中获取到最新的实例会进行本地化处理 // ServiceInfoUpdateServiceUpdateTaskrun() ServiceInfo serviceObj serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (serviceObj null) {serviceObj namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 本地缓存处理serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime serviceObj.getLastRefTime();return; } 这个run方法的详细逻辑已经分析过了主要来看其中本地缓存处理的方法serviceInfoHolder.processServiceInfo先来分析流程。这个逻辑简单来说判断新的ServiceInfo数据是否正确是否发生了变化。如果数据格式正确且发生变化那就发布一个InstancesChangeEvent事件同时将ServiceInfo写入本地缓存。 public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey serviceInfo.getKey();if (serviceKey null) {return null;}ServiceInfo oldService serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 缓存服务信息serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 判断注册的实例信息是否已变更boolean changed isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}// 监控服务监控缓存Map的大小MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());// 服务实例以更变if (changed) {NAMING_LOGGER.info(current ips:({}) service: {} - {}, serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 添加实例变更事件会被订阅者执行NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 记录Service本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo; } 在上面的流程中核心点在于NotifyCenter通知中心具体流程如下  NotifyCenter中进行事件发布发布的核心逻辑是 1、根据InstancesChangeEvent事件类型获得对应的CanonicalName 2、将CanonicalName作为key从NotifyCenter.publisherMap中获取对应的事件发布者(EventPublisher) 3、EventPublisher将InstancesChangeEvent事件进行发布  private static boolean publishEvent(final Class? extends Event eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}// 根据InstancesChangeEvent事件类型获得对应的CanonicalNamefinal String topic ClassUtils.getCanonicalName(eventType);// 将CanonicalName作为Key从NotifyCenter#publisherMap中获取对应的事件发布者EventPublisherEventPublisher publisher INSTANCE.publisherMap.get(topic);if (publisher ! null) {// 事件发布者publisher发布事件InstancesChangeEventreturn publisher.publish(event);}LOGGER.warn(There are no [{}] publishers for this event, please register, topic);return false; } 在这个源码中其实INSTANCE为NotifyCenter的单例实现那么这里的publisherMap中key(CanonicalName)和value(EventPublisher)之间的关系是什么时候建立的其实是在NacosNamingService实例化时调用init初始化方法中进行绑定的 // Publisher的注册过程在于建立InstancesChangeEvent.class与EventPublisher的关系。 NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); 这里再继续跟踪registerToPublisher方法就会发现默认采用了DEFAULT_PUBLISHER_FACTORY默认发布者工厂来进行构建再继续跟踪会发现在NotifyCenter中静态代码块会发现DEFAULT_PUBLISHER_FACTORY默认构建的EventPublisher为DefaultPublisher。 //NotifyCenter public static EventPublisher registerToPublisher(final Class? extends Event eventType, final int queueMaxSize) {return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize); } -------------------------------------------------------------------------------------------- //NotifyCenterstatic中部分代码 DEFAULT_PUBLISHER_FACTORY (cls, buffer) - {try {EventPublisher publisher clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error(Service class newInstance has error : , ex);throw new NacosRuntimeException(SERVER_ERROR, ex);} }; 最后得出结论NotifyCenter中它维护了事件名称和事件发布者的关系而默认的事件发布者为DefaultPublisher。 现在来看一下默认事件发布者的源码查看以后会发现它继承自Thread也就是说它是一个线程类同时它又实现了EventPublisher也就是发布者 public class DefaultPublisher extends Thread implements EventPublisher 接下来来看它的init初始化方法从这里可以看出当DefaultPublisher被初始化时是以守护线程的方式运作的其中还初始化了一个阻塞队列 Override public void init(Class? extends Event type, int bufferSize) {// 守护线程setDaemon(true);// 设置线程名字setName(nacos.publisher- type.getName());this.eventType type;this.queueMaxSize bufferSize;// 阻塞队列初始化this.queue new ArrayBlockingQueue(bufferSize);start(); } 最后调用了start()方法在这其中调用了super.start()启动线程 Override public synchronized void start() {if (!initialized) {// start just called oncesuper.start();if (queueMaxSize -1) {queueMaxSize ringBufferSize;}initialized true;} } run()方法调用openEventHandler()方法 Override public void run() {openEventHandler(); }void openEventHandler() {try {// This variable is defined to resolve the problem which message overstock in the queue.int waitTimes 60;// To ensure that messages are not lost, enable EventHandler when// waiting for the first Subscriber to register// 死循环延迟线程启动最大延时60秒这个主要是为了解决消息积压的问题。for (; ; ) {if (shutdown || hasSubscriber() || waitTimes 0) {break;}ThreadUtils.sleep(1000L);waitTimes--;}// 死循环不断的从队列中取出Event并通知订阅者Subscriber执行Eventfor (; ; ) {if (shutdown) {break;}// 从队列中取出Eventfinal Event event queue.take();receiveEvent(event);UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error(Event listener exception : , ex);} } 这里写了两个死循环第一个死循环可以理解为延时效果也就是说线程启动时最大延时60秒在这60秒中每隔1秒判断一下当前线程是否关闭是否有订阅者是否超过60秒。如果满足一个条件就可以提前跳出死循环而第二个死循环才是真正的业务逻辑处理会从阻塞队列中取出一个事件然后通过receiveEvent方法进行执行。 队列中的事件哪里来的其实就是DefaultPublisher的发布事件方法被调用了publish往阻塞队列中存入事件如果存入失败会直接调用receiveEvent。可以理解为如果向队列中存入失败则立即执行不走队列了。 Override public boolean publish(Event event) {checkIsStart();// 向队列中插入事件元素boolean success this.queue.offer(event);// 判断是否成功插入if (!success) {LOGGER.warn(Unable to plug in due to interruption, synchronize sending time, event : {}, event);// 失败直接执行receiveEvent(event);return true;}return true; } 最后再来看receiveEvent方法的实现这里其实就是遍历DefaultPublisher的subscribers订阅者集合然后执行通知订阅者的方法。 void receiveEvent(Event event) {final long currentEventSequence event.sequence();if (!hasSubscriber()) {LOGGER.warn([NotifyCenter] the {} is lost, because there is no subscriber., event);return;}// Notification single event listener// 通知订阅者执行Eventfor (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() lastEventSequence currentEventSequence) {LOGGER.debug([NotifyCenter] the {} is unacceptable to this subscriber, because had expire,event.getClass());continue;}// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.// Remove original judge part of codes.notifySubscriber(subscriber, event);} } 但是这里还有一个疑问就是subscribers中订阅者哪里来的这个还要回到NacosNamingService的init方法中 // 将Subscribe注册到Publisher NotifyCenter.registerSubscriber(changeNotifier); registerSubscriber方法最终会调用NotifyCenter的addSubscriber方法核心逻辑就是将订阅事件、发布者、订阅者三者进行绑定。而发布者与事件通过Map进行维护、发布者与订阅者通过关联关系进行维护。 private static void addSubscriber(final Subscriber consumer, Class? extends Event subscribeType,EventPublisherFactory factory) {final String topic ClassUtils.getCanonicalName(subscribeType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);}// 获取事件对应的PublisherEventPublisher publisher INSTANCE.publisherMap.get(topic);if (publisher instanceof ShardedEventPublisher) {((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);} else {// 添加到subscribers集合publisher.addSubscriber(consumer);} } 关系都已经梳理明确了事件也有了最后看一下DefaulePublisher中的notifySubscriber方法这里就是真正的订阅者执行事件了。 Override public void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug([NotifyCenter] the {} will received by {}, event, subscriber);//执行订阅者事件final Runnable job () - subscriber.onEvent(event);// 执行者final Executor executor subscriber.executor();if (executor ! null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error(Event callback exception: , e);}} } 整体服务订阅的事件机制还是比较复杂的因为用到了事件的形式逻辑比较绕并且其中还有守护线程死循环阻塞队列等。需要重点理解NotifyCenter对事件发布者、事件订阅者和事件之间关系的维护而这一关系的维护的入口就位于NacosNamingService的init方法当中。 核心流程梳理 ServiceInfoHolder中通过NotifyCenter发布了InstancesChangeEvent事件 NotifyCenter中进行事件发布发布的核心逻辑是 - 根据InstancesChangeEvent事件类型获得对应的CanonicalName - 将CanonicalName作为Key从NotifyCenter.publisherMap中获取对应的事件发布者EventPublisher - EventPublisher将InstancesChangeEvent事件进行发布 InstancesChangeEvent事件发布 - 通过EventPublisher的实现类DefaultPublisher进行InstancesChangeEvent事件发布 - DefaultPublisher本身以守护线程的方式运作在执行业务逻辑前先判断该线程是否启动 - 如果启动则将事件添加到BlockingQueue中队列默认大小为16384 - 添加到BlockingQueue成功则整个发布过程完成 - 如果添加失败则直接调用DefaultPublisher.receiveEvent方法接收事件并通知订阅者 - 通知订阅者时创建一个Runnable对象执行订阅者的Event - Event事件便是执行订阅时传入的事件 如果添加到BlockingQueue成功则走另外一个业务逻辑 - DefaultPublisher初始化时会创建一个阻塞BlockingQueue队列并标记线程启动 - DefaultPublisher本身是一个Thread当执行super.start方法时会调用它的run方法 - run方法的核心业务逻辑是通过openEventHandler方法处理的 - openEventHandler方法通过两个for循环从阻塞队列中获取时间信息 - 第一个for循环用于让线程启动时在60s内检查执行条件 - 第二个for循环为死循环从阻塞队列中获取Event并调用DefaultPublisher#receiveEvent方法接收事件并通知订阅者 - Event事件便是执行订阅时传入的事件 Nacos客户端本地缓存及故障转移 Nacos在本地缓存的时候偶尔会出现一些故障这些故障就需要进行处理涉及到的核心类为ServiceInfoHolder和FailoverReactor。 本地缓存有两方面第一方面是从注册中心获得实例信息会缓存在内存当中也就是通过Map的形式承载这样查询操作很方便。第二方面便是通过磁盘文件的形式定时缓存起来以备不时之需。 ​故障转移也分两方面第一方面是故障转移的开关是通过文件来标记的第二方面是当开启故障转移之后当发生故障时可以从故障转移备份的文件中来获得服务实例信息。  ServiceInfoHolder类顾名思义服务信息的持有者。每次客户端从注册中心获取新的服务信息时都会调用该类其中processServiceInfo方法来进行本地化处理包括更新缓存服务、发布事件、更新本地文件等。除了这些核心功能以外该类在实例化的时候还做了本地缓存目录初始化、故障转移初始化等操作。 ServiceInfo类注册服务的信息其中包含了服务名称、分组名称、集群信息、实例列表信息上次更新时间等由此得出客户端从服务端注册中心获得到的信息在本地都以ServiceInfo作为承载者。而ServiceInfoHolder类又持有了ServiceInfo通过一个ConcurrentMap来储存 // ServiceInfoHolder private final ConcurrentMapString, ServiceInfo serviceInfoMap; 这就是Nacos客户端对服务端获取到的注册信息的第一层缓存并且之前分析processServiceInfo方法时已经看到当服务信息变更时会第一时间更新ServiceInfoMap中的信息。 public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {....//缓存服务信息serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 判断注册的实例信息是否更改boolean changed isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}....return serviceInfo; } serviceInfoMap的使用就是这样当变动实例向其中put最新数据即可。当使用实例时根据key进行get操作即可。serviceInfoMap在ServiceInfoHolder的构造方法中进行初始化默认创建一个空的ConcurrentMap。但当配置了启动时从缓存文件读取信息时则会从本地缓存进行加载。 public ServiceInfoHolder(String namespace, Properties properties) {initCacheDir(namespace, properties);// 启动时是否从缓存目录读取信息默认false。if (isLoadCacheAtStart(properties)) {this.serviceInfoMap new ConcurrentHashMapString, ServiceInfo(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap new ConcurrentHashMapString, ServiceInfo(16);}this.failoverReactor new FailoverReactor(this, cacheDir);this.pushEmptyProtection isPushEmptyProtect(properties); } 这里注意一下涉及到了本地缓存目录processServiceInfo方法中当服务实例变更时会看到通过DiskCache#write方法向该目录写入ServiceInfo信息。 public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {.....// 服务实例已变更if (changed) {NAMING_LOGGER.info(current ips:({}) service: {} - {}, serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 添加实例变更事件InstancesChangeEvent订阅者NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 记录Service本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo; } 本地缓存目录cacheDir是ServiceInfoHolder的一个属性用于指定本地缓存的根目录和故障转移的根目录。 在ServiceInfoHolder的构造方法中初始化并且生成缓存目录 这个initCacheDir就不用了细看了就是生成缓存目录的操作默认路径${user.home}/nacos/naming/public也可以通过System.setProperty(JM.SNAPSHOT.PATH)自定义 。这里初始化完目录之后故障转移信息也存储在该目录下。 private void initCacheDir(String namespace, Properties properties) {String jmSnapshotPath System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);String namingCacheRegistryDir ;if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) ! null) {namingCacheRegistryDir File.separator properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);}if (!StringUtils.isBlank(jmSnapshotPath)) {cacheDir jmSnapshotPath File.separator FILE_PATH_NACOS namingCacheRegistryDir File.separator FILE_PATH_NAMING File.separator namespace;} else {cacheDir System.getProperty(USER_HOME_PROPERTY) File.separator FILE_PATH_NACOS namingCacheRegistryDir File.separator FILE_PATH_NAMING File.separator namespace;} } 在ServiceInfoHolder的构造方法中还会初始化一个FailoverReactor类同样是ServiceInfoHolder的成员变量。FailoverReactor的作用便是用来处理故障转移的。 public ServiceInfoHolder(String namespace, Properties properties) {....// this为ServiceHolder当前对象这里可以立即为两者相互持有对方的引用this.failoverReactor new FailoverReactor(this, cacheDir);..... } 看一下FailoverReactor的构造方法FailoverReactor的构造方法基本上把它的功能都展示出来了 public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {// 持有ServiceInfoHolder的引用this.serviceInfoHolder serviceInfoHolder;// 拼接故障目录${user.home}/nacos/naming/public/failoverthis.failoverDir cacheDir FAILOVER_DIR;// 初始化executorServicethis.executorService new ScheduledThreadPoolExecutor(1, new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);// 守护线程模式运行thread.setDaemon(true);thread.setName(com.alibaba.nacos.naming.failover);return thread;}});// 其他初始化操作通过executorService开启多个定时任务执行this.init(); } 1、持有ServiceInfoHolder的引用 2、拼接故障目录${user.home}/nacos/naming/public/failover其中public也有可能是其他的自定义命名空间 3、初始化executorService执行者服务 4、init方法通过executorService开启多个定时任务执行 在 init 方法中开启了三个定时任务这三个任务其实都是FailoverReactor的内部类 1、初始化立即执行执行间隔5秒执行任务SwitchRefresher 2、初始化延迟30分钟执行执行间隔24小时执行任务DiskFileWriter 3、初始化立即执行执行间隔10秒执行核心操作为DiskFileWriter public void init() {// 初始化立即执行执行间隔5秒执行任务SwitchRefresherexecutorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);// 初始化延迟30分钟执行执行间隔24小时执行任务DiskFileWriterexecutorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);// backup file on startup if failover directory is empty.// 如果故障目录为空启动时立即执行立即备份文件// 初始化立即执行执行间隔10秒执行核心操作为DiskFileWriterexecutorService.schedule(new Runnable() {Overridepublic void run() {try {File cacheDir new File(failoverDir);if (!cacheDir.exists() !cacheDir.mkdirs()) {throw new IllegalStateException(failed to create cache dir: failoverDir);}File[] files cacheDir.listFiles();if (files null || files.length 0) {new DiskFileWriter().run();}} catch (Throwable e) {NAMING_LOGGER.error([NA] failed to backup file on startup., e);}}}, 10000L, TimeUnit.MILLISECONDS); } 先看DiskFileWriter这里的逻辑不难就是获取ServiceInfo中缓存的ServiceInfo判断是否满足写入磁盘如果条件满足就将其写入拼接的故障目录因为后两个定时任务执行的都是DiskFileWriter但是第三个定时任务是有前置判断的只要文件不存在就会立即执行把文件写入到本地磁盘中。 class DiskFileWriter extends TimerTask {Overridepublic void run() {MapString, ServiceInfo map serviceInfoHolder.getServiceInfoMap();for (Map.EntryString, ServiceInfo entry : map.entrySet()) {ServiceInfo serviceInfo entry.getValue();if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {continue;}// 将缓存写入磁盘DiskCache.write(serviceInfo, failoverDir);}} } 再来看第一个定时任务SwitchRefresher的核心实现具体逻辑如下 1、如果故障转移文件不存在则直接返回文件开关 2、比较文件修改时间如果已经修改则获取故障转移文件中的内容 3、故障转移文件中存储了0和1标识。0表示关闭1表示开启 4、当为开启状态时执行线程FailoverFileReader class SwitchRefresher implements Runnable {long lastModifiedMillis 0L;Overridepublic void run() {try {File switchFile new File(failoverDir UtilAndComs.FAILOVER_SWITCH);// 文件不存在则退出if (!switchFile.exists()) {switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());NAMING_LOGGER.debug(failover switch is not found, {}, switchFile.getName());return;}long modified switchFile.lastModified();if (lastModifiedMillis modified) {lastModifiedMillis modified;// 获取故障转移文件内容String failover ConcurrentDiskUtil.getFileContent(failoverDir UtilAndComs.FAILOVER_SWITCH,Charset.defaultCharset().toString());if (!StringUtils.isEmpty(failover)) {String[] lines failover.split(DiskCache.getLineSeparator());for (String line : lines) {String line1 line.trim();// 1 表示开启故障转移模式if (IS_FAILOVER_MODE.equals(line1)) {switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());NAMING_LOGGER.info(failover-mode is on);new FailoverFileReader().run();// 0 表示关闭故障转移模式} else if (NO_FAILOVER_MODE.equals(line1)) {switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());NAMING_LOGGER.info(failover-mode is off);}}} else {switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());}}} catch (Throwable e) {NAMING_LOGGER.error([NA] failed to read failover switch., e);}} } FailoverFileReader类顾名思义故障转移文件读取基本操作就是读取failover目录存储的备份服务信息文件内容然后转换成ServiceInfo并且将所有的ServiceInfo储存在FailoverReactor的ServiceMap属性中。流程如下 1、读取failover目录下的所有文件进行遍历处理 2、如果文件不存在跳过 3、如果文件是故障转移开关标志文件跳过 4、读取文件中的备份内容转换为ServiceInfo对象 5、将ServiceInfo对象放入到domMap中 6、最后判断domMap不为空赋值给serviceMap class FailoverFileReader implements Runnable {Overridepublic void run() {MapString, ServiceInfo domMap new HashMapString, ServiceInfo(16);BufferedReader reader null;try {File cacheDir new File(failoverDir);if (!cacheDir.exists() !cacheDir.mkdirs()) {throw new IllegalStateException(failed to create cache dir: failoverDir);}File[] files cacheDir.listFiles();if (files null) {return;}for (File file : files) {if (!file.isFile()) {continue;}// 如果是故障转移标志文件则跳过if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {continue;}ServiceInfo dom new ServiceInfo(file.getName());try {String dataString ConcurrentDiskUtil.getFileContent(file, Charset.defaultCharset().toString());reader new BufferedReader(new StringReader(dataString));String json;if ((json reader.readLine()) ! null) {try {dom JacksonUtils.toObj(json, ServiceInfo.class);} catch (Exception e) {NAMING_LOGGER.error([NA] error while parsing cached dom : {}, json, e);}}} catch (Exception e) {NAMING_LOGGER.error([NA] failed to read cache for dom: {}, file.getName(), e);} finally {try {if (reader ! null) {reader.close();}} catch (Exception e) {//ignore}}if (!CollectionUtils.isEmpty(dom.getHosts())) {domMap.put(dom.getKey(), dom);}}} catch (Exception e) {NAMING_LOGGER.error([NA] failed to read cache file, e);}// 读入缓存if (domMap.size() 0) {serviceMap domMap;}} } 但是这里还有一个问题就是serviceMap是哪里用到的这个其实是之前读取实例时候用到的getServiceInfo方法。其实这里就是一旦开启故障转移就会先调用failoverReactor.getService方法此方法便是从serviceMap中获取ServiceInfo public ServiceInfo getService(String key) {ServiceInfo serviceInfo serviceMap.get(key);if (serviceInfo null) {serviceInfo new ServiceInfo();serviceInfo.setName(key);}return serviceInfo; } 调用serviceMap方法getServiceInfo方法就在ServiceInfoHolder中 // ServiceInfoHolder public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug(failover-mode: {}, failoverReactor.isFailoverSwitch());String groupedServiceName NamingUtils.getGroupedName(serviceName, groupName);String key ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}return serviceInfoMap.get(key); } Nacos集群数据同步 当有服务进行注册以后会写入注册信息同时会触发ClientChangedEvent事件通过这个事件就会开始进行Nacos的集群数据同步当然这其中只有一个Nacos节点来处理对应的客户端请求其实这其中还涉及到一个负责节点和非负责节点的概念。 负责节点首先要查看的是DistroClientDataProcessor客户端数据一致性处理器类型这个类型会处理当前节点负责的Client查看其中的syncToAllServer方法。 private void syncToAllServer(ClientEvent event) {Client client event.getClient();// 判断客户端是否为空是否是临时实例判断是否是负责节点if (null client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {// 客户端断开连接DistroKey distroKey new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 客户端新增/修改DistroKey distroKey new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);} } distroProtocol会循环所有其它nacos节点提交一个异步任务这个异步任务会延迟1s其实这里就可以看到这里涉及到客户端的断开和客户端的新增和修改对于Delete操作由DistroSyncDeleteTask处理对于Change操作由DistroSyncChangeTask处理这里从DistroSyncChangeTask来看 public class DistroSyncChangeTask extends AbstractDistroExecuteTask {private static final DataOperation OPERATION DataOperation.CHANGE;public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {super(distroKey, distroComponentHolder);}Overrideprotected DataOperation getDataOperation() {return OPERATION;}// 无回调Overrideprotected boolean doExecute() {String type getDistroKey().getResourceType();DistroData distroData getDistroData(type);if (null distroData) {Loggers.DISTRO.warn([DISTRO] {} with null data to sync, skip, toString());return true;}return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());}// 有回调Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type getDistroKey().getResourceType();DistroData distroData getDistroData(type);if (null distroData) {Loggers.DISTRO.warn([DISTRO] {} with null data to sync, skip, toString());return;}getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}Overridepublic String toString() {return DistroSyncChangeTask for getDistroKey().toString();}// 从DistroClientDataProcessor获取DistroDataprivate DistroData getDistroData(String type) {DistroData result getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null ! result) {result.setType(OPERATION);}return result;} } 获取到的DistroData其实是从ClientManager实时获取Client // DistroClientDataProcessor Override public DistroData getDistroData(DistroKey distroKey) {Client client clientManager.getClient(distroKey.getResourceKey());if (null client) {return null;}// 把生成的同步数据放入到数组中byte[] data ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());return new DistroData(distroKey, data); } AbstractClient继承了Client同时给DistroClientDataProcessorClient提供Client的注册信息包括客户端注册了哪些namespace哪些group哪些service哪些instance。 Override public ClientSyncData generateSyncData() {ListString namespaces new LinkedList();ListString groupNames new LinkedList();ListString serviceNames new LinkedList();ListInstancePublishInfo instances new LinkedList();for (Map.EntryService, InstancePublishInfo entry : publishers.entrySet()) {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances); } 再回过头来看syncData方法这个方法实际上是由DistroClientTransportAgent封装为DistroDataRequest调用其它Nacos节点 Override public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}DistroDataRequest request new DistroDataRequest(data, data.getType());Member member memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn([DISTRO] Cancel distro sync caused by target server {} unhealthy, targetServer);return false;}try {Response response clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error([DISTRO-FAILED] Sync distro data failed! , e);}return false; } 非负责节点当负责节点将数据发送给非负责节点以后将要处理发送过来的Client数据。看DistroClientDataProcessor.processData方法 Override public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);//处理同步数据handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info([Client-Delete] Received distro client sync data {}, deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;} } 然后来查看具体处理方法handlerClientSyncData private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info([Client-Add] Received distro client sync data {}, clientSyncData.getClientId());// 同步客户端连接clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());// 获取Client此时注册到的是ConnectionBasedClientClient client clientManager.getClient(clientSyncData.getClientId());// 更新Client数据upgradeClient(client, clientSyncData); } DistroClientDataProcessor的upgradeClient方法更新Client里的注册表信息发布对应事件 private void upgradeClient(Client client, ClientSyncData clientSyncData) {ListString namespaces clientSyncData.getNamespaces();ListString groupNames clientSyncData.getGroupNames();ListString serviceNames clientSyncData.getServiceNames();ListInstancePublishInfo instances clientSyncData.getInstancePublishInfos();SetService syncedService new HashSet();for (int i 0; i namespaces.size(); i) {Service service Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo instances.get(i);if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}} } 注意此时的Client实现类ConnectionBasedClient只不过它的isNative属性为false这是非负责节点和负责节点的主要区别。 其实判断当前nacos节点是否为负责节点的依据就是这个isNative属性如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient它的isNative属性为true如果是由Distro协议同步到这个nacos节点上的ConnectionBasedClient它的isNative属性为false。​2.x的版本以后使用了长连接所以通过长连接建立在哪个节点上哪个节点就是责任节点客户端也只会向这个责任节点发送请求。 Distro为了确保集群间数据一致不仅仅依赖于数据发生改变时的实时同步后台有定时任务做数据同步。在1.x版本中责任节点每5s同步所有Service的Instance列表的摘要md5给非责任节点非责任节点用对端传来的服务md5比对本地服务的md5如果发生改变需要反查责任节点。 在2.x版本中对这个流程做了改造责任节点会发送Client全量数据非责任节点定时检测同步过来的Client是否过期减少1.x版本中的反查。责任节点每5s向其他节点发送DataOperationVERIFY类型的DistroData来维持非责任节点的Client数据不过期。 //DistroVerifyTimedTask Override public void run() {try {// 所有其他节点ListMember targetServer serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug(server list is: {}, targetServer);}for (String each : distroComponentHolder.getDataStorageTypes()) {// 遍历想这些节点发送Client.isNativetrue的DistroDatatype VERIFYverifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error([DISTRO-FAILED] verify task failed., e);} } 非责任节点每5s扫描isNativefalse的client如果client30s内没有被VERIFY的DistroData更新过续租时间会删除这个同步过来的Client数据。 //ConnectionBasedClientManager-ExpiredClientCleaner private static class ExpiredClientCleaner implements Runnable {private final ConnectionBasedClientManager clientManager;public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {this.clientManager clientManager;}Overridepublic void run() {long currentTime System.currentTimeMillis();for (String each : clientManager.allClientId()) {ConnectionBasedClient client (ConnectionBasedClient) clientManager.getClient(each);if (null ! client client.isExpire(currentTime)) {clientManager.clientDisconnected(each);}}} } ------------------------------------------------------------------------------------------- Override public boolean isExpire(long currentTime) {// 判断30s内没有续租 认为过期return !isNative() currentTime - getLastRenewTime() ClientConfig.getInstance().getClientExpiredTime(); }
http://www.hkea.cn/news/14488361/

相关文章:

  • 丰城市建设局网站医院证明p图软件在线
  • 做网站做哪个江苏建设管理中心网站
  • 免费淘宝客网站模板网站常见程序问题
  • 济南微网站开发网店营销策划方案范文
  • 90设计网站最便宜终身wordpress标题图片代码
  • 驻马店市做网站开什么加工厂不愁销路
  • 安徽平台网站建设设计wordpress最大附件
  • 阿里巴巴国际站入驻邯郸市做网站建设
  • 常州高端网站建设公司哪家好wordpress 逻辑代码
  • 设计师个人网站主页深圳企业视频制作公司
  • 做网站应该了解什么问题免费咨询律师问题
  • 宁波网站建设招商加盟基于那种语言开发网页
  • 温州网站建设制作制作页培训
  • 网站文件夹怎么做网页传奇手游
  • 河源盛世网站建设济南手机网站定制价格
  • 网页制作素材网站推荐什么是网络营销模式
  • 商城网站设计服务商电影网站开发视频教程
  • wordpress 多站点注册为什么网站建设要值班
  • 网站dns设置工业品电商平台排行榜
  • 湖北民族建设集团网站首页怎么给自己的网站做排名
  • 大连开发区网站开发公司电话wordpress可以上传网页
  • 网站主页制作教程专业网站建设哪家好
  • 德州汇泽网站建设头条淘宝联盟网站推广怎么做
  • 微网站 模板项目管理中软件分类为
  • 织梦视频资讯网站源码常州的网站建设
  • 网站名称备案以小说名字做网站的小说网
  • 常州网站建设大全沧州网络公司排名
  • 电子商务网站建设规划的内容屏蔽蜘蛛抓取 对网站有什么影响
  • 鹤山网站建设常用的北京文化馆设计公司
  • 绍兴网站推广排名国家住房和城乡建设局网站