南宁市网站设计,专业层析成像代做网站,网页设计培训(可0基础),上海新增感染呈下降趋势源码解析-Spring Eureka 文章目录 源码解析-Spring Eureka前言一、从Spring.factory和注解开始二、重要的一步EurekaServerInitializerConfiguration三、初始化了什么#xff1f;自动保护 四, 重新回到EurekaServerAutoConfiguration 前言
无 一、从Spring.factory和注解开始…源码解析-Spring Eureka 文章目录 源码解析-Spring Eureka前言一、从Spring.factory和注解开始二、重要的一步EurekaServerInitializerConfiguration三、初始化了什么自动保护 四, 重新回到EurekaServerAutoConfiguration 前言
无 一、从Spring.factory和注解开始
我们可以看到Eureka通过spring boot的自动配置机制引入了一个类
org.springframework.boot.autoconfigure.EnableAutoConfiguration\org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
通过这个配置我们找到对应的配置类可以看到这个配置类使用了Marker作为条件注入
Configuration(proxyBeanMethods false
)
Import({EurekaServerInitializerConfiguration.class})
ConditionalOnBean({EurekaServerMarkerConfiguration.Marker.class})
EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
PropertySource({classpath:/eureka/server.properties})
public class EurekaServerAutoConfiguration implements WebMvcConfigurer这个时候我们返回查看我们配置一个eureka所需要的基本注解可以看到我们正在这个这个EnableEurekaServer 注解里面初始化了这个类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package org.springframework.cloud.netflix.eureka.server;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;Target({ElementType.TYPE})
Retention(RetentionPolicy.RUNTIME)
Documented
Import({EurekaServerMarkerConfiguration.class})
public interface EnableEurekaServer {
}
通过spring.factory的自动配置以及EnableEurekaServer 就可以实现eureka服务端的手动注入通过加入注解
二、重要的一步EurekaServerInitializerConfiguration
在上面的EurekaServerAutoConfiguration里面我们可以看到它import了一个初始化类 注意在这个初始化类实现了SmartLifeCycle接口实现了其Start方法
Configuration(proxyBeanMethods false
)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered 实现的start方法会在bean在启动的时候调用该方法会new一个线程并发布订阅 public void start() {(new Thread(() - {try {this.eurekaServerBootstrap.contextInitialized(this.servletContext);log.info(Started Eureka Server);this.publish(new EurekaRegistryAvailableEvent(this.getEurekaServerConfig()));this.running true;this.publish(new EurekaServerStartedEvent(this.getEurekaServerConfig()));} catch (Exception var2) {log.error(Could not initialize Eureka servlet context, var2);}})).start();}可以看到通过这个start方法eureka初始化了它自己的context上下文并发布了一些事件。
三、初始化了什么
进入到contextInitialized方法我们可以看到 public void contextInitialized(ServletContext context) {try {this.initEurekaEnvironment();this.initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);} catch (Throwable var3) {log.error(Cannot bootstrap eureka server :, var3);throw new RuntimeException(Cannot bootstrap eureka server :, var3);}}eureka首先初始化了配置信息然后进行上下文的初始化 protected void initEurekaServerContext() throws Exception {JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);if (this.isAws(this.applicationInfoManager.getInfo())) {this.awsBinder new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);log.info(Initialized server context);int registryCount this.registry.syncUp();this.registry.openForTraffic(this.applicationInfoManager, registryCount);EurekaMonitors.registerAllStats();}进入到initEurekaServerContext方法我们可以看到几个重要的方法 在openForTraffic方法里面 public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {this.expectedNumberOfClientsSendingRenews count;this.updateRenewsPerMinThreshold();logger.info(Got {} instances from neighboring DS node, count);logger.info(Renew threshold is: {}, this.numberOfRenewsPerMinThreshold);this.startupTime System.currentTimeMillis();if (count 0) {this.peerInstancesTransferEmptyOnStartup false;}DataCenterInfo.Name selfName applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws Name.Amazon selfName;if (isAws this.serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info(Priming AWS connections for all replicas..);this.primeAwsReplicas(applicationInfoManager);}logger.info(Changing status to UP);applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit();}我们重点关注这里的super.postInit() protected void postInit() {this.renewsLastMin.start();if (this.evictionTaskRef.get() ! null) {((EvictionTask)this.evictionTaskRef.get()).cancel();}this.evictionTaskRef.set(new EvictionTask());this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());}可以看到this.evictionTaskRef.set(new EvictionTask());这里注册了一个剔除任务
int registrySize (int)this.getLocalRegistrySize();int registrySizeThreshold (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold());int evictionLimit registrySize - registrySizeThreshold;int toEvict Math.min(expiredLeases.size(), evictionLimit);这里的剔除与eureka配置里面的自我保护配置有关
自动保护
在eureka中如果打开了自我保护配置并设置了剔除阈值eureka集群就会在计算正常超过阈值的时候执行上面的代码把的节点给剔除
如果现在有10个节点7个节点是正常3个节点是由有问题的阈值设置了80%这个时候7个节点中的一个节点出现了问题但是没有超过阈值变成了60%这个时候就会访问到失败的节点如果现在有100个节点3个节点有问题阈值也是80%现在的值是97%超过了阈值如果这个时候有节点出现问题则会立即剔除但是不能把自我保护关闭如果3个节点是因为波动导致的暂时访问不到则会立即被剔除
eureka:server:enable-self-preservation: true
eureka:server:renewal-percent-threshold: 0.85我们再进到syncUp方法里面
public int syncUp() {int count 0;for(int i 0; i this.serverConfig.getRegistrySyncRetries() count 0; i) {if (i 0) {try {Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());} catch (InterruptedException var10) {logger.warn(Interrupted during registry transfer..);break;}}Applications apps this.eurekaClient.getApplications();Iterator var4 apps.getRegisteredApplications().iterator();可以看到当一个eureka服务启动的时候会作为一个eureka客户端去peer节点拉取配置这也是eureka为什么不是强一致性的
四, 重新回到EurekaServerAutoConfiguration
首先就是eureka的控制器类eureka的dashboard上面的数据通过这个控制器springWeb来获取
BeanConditionalOnProperty(prefix eureka.dashboard,name {enabled},matchIfMissing true)public EurekaController eurekaController() {return new EurekaController(this.applicationInfoManager);}接着再eurekaServerContext里面实例化了属于eureka的上下文 BeanConditionalOnMissingBeanpublic EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);}进入context的初始化方法可以看在context初始化里面有重要的一环就是设置三级缓存initializedResponseCache public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {this.numberOfReplicationsLastMin.start();this.peerEurekaNodes peerEurekaNodes;this.initializedResponseCache();this.scheduleRenewalThresholdUpdateTask();this.initRemoteRegionRegistry();try {Monitors.registerObject(this);} catch (Throwable var3) {logger.warn(Cannot register the JMX monitor for the InstanceRegistry :, var3);}}里面有一个定时任务就是定期刷新缓存 if (this.shouldUseReadOnlyResponseCache) {this.timer.schedule(this.getCacheUpdateTask(), new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs);}继续查看自动配置类可以看到eureka通过jersey框架包装了注册服务 Beanpublic FilterRegistrationBean? jerseyFilterRegistration(Application eurekaJerseyApp) {FilterRegistrationBeanFilter bean new FilterRegistrationBean();bean.setFilter(new ServletContainer(eurekaJerseyApp));bean.setOrder(Integer.MAX_VALUE);bean.setUrlPatterns(Collections.singletonList(/eureka/*));return bean;}再看jerseyApplication这里面会对eureka的包路径进行扫描 并将其中的候选类进行注入其中非常重要的就是resource目录下的applicationsresoure该方法会返回一个Application的BeanSPring的config加Bean
Beanpublic Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {ClassPathScanningCandidateComponentProvider provider new ClassPathScanningCandidateComponentProvider(false, environment);provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));SetClass? classes new HashSet();String[] var5 EUREKA_PACKAGES;int var6 var5.length;for(int var7 0; var7 var6; var7) {String basePackage var5[var7];SetBeanDefinition beans provider.findCandidateComponents(basePackage);Iterator var10 beans.iterator();while(var10.hasNext()) {BeanDefinition bd (BeanDefinition)var10.next();Class? cls ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());classes.add(cls);}}MapString, Object propsAndFeatures new HashMap();propsAndFeatures.put(com.sun.jersey.config.property.WebPageContentRegex, /eureka/(fonts|images|css|js)/.*);DefaultResourceConfig rc new DefaultResourceConfig(classes);rc.setPropertiesAndFeatures(propsAndFeatures);return rc;}在applicationsResource里面通过jersey编写一系列关于**注册中心的“注册”“取消”“续约”**等的HTTP方法 我们先来看获取ALLKey的方法拉取所有配置
GETpublic Response getContainers(PathParam(version) String version, HeaderParam(Accept) String acceptHeader, HeaderParam(Accept-Encoding) String acceptEncoding, HeaderParam(X-Eureka-Accept) String eurekaAccept, Context UriInfo uriInfo, Nullable QueryParam(regions) String regionsStr) {boolean isRemoteRegionRequested null ! regionsStr !regionsStr.isEmpty();String[] regions null;if (!isRemoteRegionRequested) {EurekaMonitors.GET_ALL.increment();} else {regions regionsStr.toLowerCase().split(,);Arrays.sort(regions);EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();}if (!this.registry.shouldAllowAccess(isRemoteRegionRequested)) {return Response.status(Status.FORBIDDEN).build();} else {CurrentRequestVersion.set(Version.toEnum(version));Key.KeyType keyType KeyType.JSON;String returnMediaType application/json;if (acceptHeader null || !acceptHeader.contains(json)) {keyType KeyType.XML;returnMediaType application/xml;}Key cacheKey new Key(EntityType.Application, ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);Response response;if (acceptEncoding ! null acceptEncoding.contains(gzip)) {response Response.ok(this.responseCache.getGZIP(cacheKey)).header(Content-Encoding, gzip).header(Content-Type, returnMediaType).build();} else {response Response.ok(this.responseCache.get(cacheKey)).build();}CurrentRequestVersion.remove();return response;}}其中ResponseCache注入了了eureka自己实现的三级缓存的getValue方法 VisibleForTestingValue getValue(Key key, boolean useReadOnlyCache) {Value payload null;try {if (useReadOnlyCache) {Value currentPayload (Value)this.readOnlyCacheMap.get(key);if (currentPayload ! null) {payload currentPayload;} else {payload (Value)this.readWriteCacheMap.get(key);this.readOnlyCacheMap.put(key, payload);}} else {payload (Value)this.readWriteCacheMap.get(key);}} catch (Throwable var5) {logger.error(Cannot get value for key : {}, key, var5);}return payload;}我们再来看在eureka中一个服务是如何被维护的 在applicationResoure中有添加服务的方法
POSTConsumes({application/json, application/xml})public Response addInstance(InstanceInfo info, HeaderParam(x-netflix-discovery-replication) String isReplication) {logger.debug(Registering instance {} (replication{}), info.getId(), isReplication);if (this.isBlank(info.getId())) {return Response.status(400).entity(Missing instanceId).build();} else if (this.isBlank(info.getHostName())) {return Response.status(400).entity(Missing hostname).build();} else if (this.isBlank(info.getIPAddr())) {return Response.status(400).entity(Missing ip address).build();} else if (this.isBlank(info.getAppName())) {return Response.status(400).entity(Missing appName).build();} else if (!this.appName.equals(info.getAppName())) {return Response.status(400).entity(Mismatched appName, expecting this.appName but was info.getAppName()).build();} else if (info.getDataCenterInfo() null) {return Response.status(400).entity(Missing dataCenterInfo).build();} else if (info.getDataCenterInfo().getName() null) {return Response.status(400).entity(Missing dataCenterInfo Name).build();} else {DataCenterInfo dataCenterInfo info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId ((UniqueIdentifier)dataCenterInfo).getId();if (this.isBlank(dataCenterInfoId)) {boolean experimental true.equalsIgnoreCase(this.serverConfig.getExperimental(registration.validation.dataCenterInfoId));if (experimental) {String entity DataCenterInfo of type dataCenterInfo.getClass() must contain a valid id;return Response.status(400).entity(entity).build();}if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo (AmazonInfo)dataCenterInfo;String effectiveId amazonInfo.get(MetaDataKey.instanceId);if (effectiveId null) {amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn(Registering DataCenterInfo of type {} without an appropriate id, dataCenterInfo.getClass());}}}this.registry.register(info, true.equals(isReplication));return Response.status(204).build();}}
进去到register方法里面我们可以看到eureka首先向自己注册了当前服务然后同步到了peer节点上面 public void register(InstanceInfo info, boolean isReplication) {int leaseDuration 90;if (info.getLeaseInfo() ! null info.getLeaseInfo().getDurationInSecs() 0) {leaseDuration info.getLeaseInfo().getDurationInSecs();}super.register(info, leaseDuration, isReplication);this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceInfo.InstanceStatus)null, isReplication);}可以看到register里面我们的InstanceInfo 被一个MapString, Lease gMap维护着 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {this.read.lock();try {MapString, LeaseInstanceInfo gMap (Map)this.registry.get(registrant.getAppName());EurekaMonitors.REGISTER.increment(isReplication);if (gMap null) {ConcurrentHashMapString, LeaseInstanceInfo gNewMap new ConcurrentHashMap();gMap (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap null) {gMap gNewMap;}}我们看下Lease的结构通过下面的这种结构我们只需要修改Lease的long信息就可以对当前节点的生命状态进行修改而不需要修改节点本身
public class LeaseT {public static final int DEFAULT_DURATION_IN_SECS 90;private T holder; // 具体的实例信息// 一些用于维护节点状态的时间信息private long evictionTimestamp;private long registrationTimestamp;private long serviceUpTimestamp;private volatile long lastUpdateTimestamp;private long duration;