浙江天力建设集团有限公司网站,网络设计专业介绍,有注入漏洞的网站源码,郑州网站排名哪家好Nacos客户端服务订阅的事件机制剖析
我们已经分析了Nacos客户端订阅的核心流程#xff1a;Nacos客户端通过一个定时任务#xff0c;每6秒从注册中心获取实例列表#xff0c;当发现实例发生变化时#xff0c;发布变更事件#xff0c;订阅者进行业务处理#xff0c;然后更…Nacos客户端服务订阅的事件机制剖析
我们已经分析了Nacos客户端订阅的核心流程Nacos客户端通过一个定时任务每6秒从注册中心获取实例列表当发现实例发生变化时发布变更事件订阅者进行业务处理然后更新内存中和本地的缓存中的实例。
我们来分析定时任务获取到最新实例列表之后整个事件机制是如何处理的首先我们先回顾整体流程 在第一步调用subscribe方法时会订阅一个EventListener事件。而在定时任务UpdateTask定时获取实例列表之后会调用ServiceInfoHolder.processServiceInfo方法对ServiceInfo进行本地处理这其中就包括和事件处理。
监听事件的注册
在subscribe方法中通过了下面的源码进行了监听事件的注册
Override
public void subscribe(String serviceName, String groupName, ListString clusters, EventListener listener)throws NacosException {if (null listener) {return;}String clusterString StringUtils.join(clusters, ,);changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);
}在这其中我们主要要关注的就是changeNotifier.registerListener此监听就是进行具体事件注册逻辑的我们来看一下源码
可以看出事件的注册便是将EventListener存储在InstancesChangeNotifier的listenerMap属性当中了。同时这里的数据结构为ConcurrentHashMapkey为服务实例的信息的拼接value为监听事件的集合。
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {String key ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSetEventListener eventListeners listenerMap.get(key);if (eventListeners null) {synchronized (lock) {eventListeners listenerMap.get(key);if (eventListeners null) {eventListeners new ConcurrentHashSetEventListener();//将EventListener缓存到listenerMaplistenerMap.put(key, eventListeners);}}}eventListeners.add(listener);
}ServiceInfo处理
上面的源码中已经完成了事件的注册现在就来追溯触发事件的来源UpdateTask中获取到最新的实例会进行本地化处理部分源码如下
// ServiceInfoUpdateServiceUpdateTaskrun()
ServiceInfo serviceObj serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj null) {serviceObj namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 本地缓存处理serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime serviceObj.getLastRefTime();return;
}这个run方法的详细逻辑昨天已经给大家分析过了今天我们主要来看其中本地缓存处理的方法serviceInfoHolder.processServiceInfo我们先来分析流程
这个逻辑简单来说判断新的ServiceInfo数据是否正确是否发生了变化。如果数据格式正确且发生变化那就发布一个InstancesChangeEvent事件同时将ServiceInfo写入本地缓存。
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey serviceInfo.getKey();if (serviceKey null) {return null;}ServiceInfo oldService serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 缓存服务信息serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 判断注册的实例信息是否已变更boolean changed isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}// 监控服务监控缓存Map的大小MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());// 服务实例以更变if (changed) {NAMING_LOGGER.info(current ips:({}) service: {} - {}, serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 添加实例变更事件会被订阅者执行NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 记录Service本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}分析到这里我们发现其实这个重点应该在服务信息变更之后发布的InstancesChangeEvent事件这个事件是NotifyCenter进行发布的我们来追踪一下源码
事件追踪
NotifyCenter通知中心的核心流程如下 NotifyCenter中进行事件发布发布的核心逻辑是
1. 根据InstancesChangeEvent事件类型获得对应的CanonicalName2. 将CanonicalName作为key从NotifyCenter.publisherMap中获取对应的事件发布者(EventPublisher)3. EventPublisher将InstancesChangeEvent事件进行发布核心代码如下
private static boolean publishEvent(final Class? extends Event eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}// 根据InstancesChangeEvent事件类型获得对应的CanonicalNamefinal String topic ClassUtils.getCanonicalName(eventType);// 将CanonicalName作为Key从NotifyCenter#publisherMap中获取对应的事件发布者EventPublisherEventPublisher publisher INSTANCE.publisherMap.get(topic);if (publisher ! null) {// 事件发布者publisher发布事件InstancesChangeEventreturn publisher.publish(event);}LOGGER.warn(There are no [{}] publishers for this event, please register, topic);return false;
}在这个源码中其实INSTANCE为NotifyCenter的单例实现那么这里的publisherMap中key(CanonicalName)和value(EventPublisher)之间的关系是什么时候建立的
其实是在NacosNamingService实例化时调用init初始化方法中进行绑定的
// Publisher的注册过程在于建立InstancesChangeEvent.class与EventPublisher的关系。
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);这里再继续跟踪registerToPublisher方法就会发现默认采用了DEFAULT_PUBLISHER_FACTORY默认发布者工厂来进行构建我们再继续跟踪会发现在NotifyCenter中静态代码块会发现DEFAULT_PUBLISHER_FACTORY默认构建的EventPublisher为DefaultPublisher。
//NotifyCenter
public static EventPublisher registerToPublisher(final Class? extends Event eventType, final int queueMaxSize) {return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);//NotifyCenterstatic中部分代码
DEFAULT_PUBLISHER_FACTORY (cls, buffer) - {try {EventPublisher publisher clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error(Service class newInstance has error : , ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}
};所以我们得出结论NotifyCenter中它维护了事件名称和事件发布者的关系而默认的事件发布者为DefaultPublisher。
DefaultPublisher的事件发布
我们现在来看一下默认事件发布者的源码查看以后我们会发现它继承自Thread也就是说它是一个线程类同时它又实现了EventPublisher也就是发布者
public class DefaultPublisher extends Thread implements EventPublisher接下来我们来看它的init初始化方法从这里我们可以看出当DefaultPublisher被初始化时是以守护线程的方式运作的其中还初始化了一个阻塞队列。
Override
public void init(Class? extends Event type, int bufferSize) {// 守护线程setDaemon(true);// 设置线程名字setName(nacos.publisher- type.getName());this.eventType type;this.queueMaxSize bufferSize;// 阻塞队列初始化this.queue new ArrayBlockingQueue(bufferSize);start();
}最后调用了start()方法在这其中调用了super.start()启动线程
Override
public synchronized void start() {if (!initialized) {// start just called oncesuper.start();if (queueMaxSize -1) {queueMaxSize ringBufferSize;}initialized true;}
}run()方法调用openEventHandler()方法 这里写了两个死循环第一个死循环可以理解为延时效果也就是说线程启动时最大延时60秒在这60秒中每隔1秒判断一下当前线程是否关闭是否有订阅者是否超过60秒。如果满足一个条件就可以提前跳出死循环。
而第二个死循环才是真正的业务逻辑处理会从阻塞队列中取出一个事件然后通过receiveEvent方法进行执行。
Override
public void run() {openEventHandler();
}void openEventHandler() {try {// This variable is defined to resolve the problem which message overstock in the queue.int waitTimes 60;// To ensure that messages are not lost, enable EventHandler when// waiting for the first Subscriber to register// 死循环延迟线程启动最大延时60秒这个主要是为了解决消息积压的问题。for (; ; ) {if (shutdown || hasSubscriber() || waitTimes 0) {break;}ThreadUtils.sleep(1000L);waitTimes--;}// 死循环不断的从队列中取出Event并通知订阅者Subscriber执行Eventfor (; ; ) {if (shutdown) {break;}// 从队列中取出Eventfinal Event event queue.take();receiveEvent(event);UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error(Event listener exception : , ex);}
}队列中的事件哪里来的其实就是DefaultPublisher的发布事件方法被调用了publish往阻塞队列中存入事件如果存入失败会直接调用receiveEvent。
可以理解为如果向队列中存入失败则立即执行不走队列了。
Override
public boolean publish(Event event) {checkIsStart();// 向队列中插入事件元素boolean success this.queue.offer(event);// 判断是否成功插入if (!success) {LOGGER.warn(Unable to plug in due to interruption, synchronize sending time, event : {}, event);// 失败直接执行receiveEvent(event);return true;}return true;
}最后再来看receiveEvent方法的实现这里其实就是遍历DefaultPublisher的subscribers订阅者集合然后执行通知订阅者的方法。
void receiveEvent(Event event) {final long currentEventSequence event.sequence();if (!hasSubscriber()) {LOGGER.warn([NotifyCenter] the {} is lost, because there is no subscriber., event);return;}// Notification single event listener// 通知订阅者执行Eventfor (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() lastEventSequence currentEventSequence) {LOGGER.debug([NotifyCenter] the {} is unacceptable to this subscriber, because had expire,event.getClass());continue;}// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.// Remove original judge part of codes.notifySubscriber(subscriber, event);}
}但是这里还有一个疑问就是subscribers中订阅者哪里来的这个还要回到NacosNamingService的init方法中
// 将Subscribe注册到Publisher
NotifyCenter.registerSubscriber(changeNotifier);registerSubscriber方法最终会调用NotifyCenter的addSubscriber方法核心逻辑就是将订阅事件、发布者、订阅者三者进行绑定。而发布者与事件通过Map进行维护、发布者与订阅者通过关联关系进行维护。
private static void addSubscriber(final Subscriber consumer, Class? extends Event subscribeType,EventPublisherFactory factory) {final String topic ClassUtils.getCanonicalName(subscribeType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);}// 获取事件对应的PublisherEventPublisher publisher INSTANCE.publisherMap.get(topic);if (publisher instanceof ShardedEventPublisher) {((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);} else {// 添加到subscribers集合publisher.addSubscriber(consumer);}
}关系都已经梳理明确了事件也有了最后我们看一下DefaulePublisher中的notifySubscriber方法这里就是真正的订阅者执行事件了。
Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug([NotifyCenter] the {} will received by {}, event, subscriber);//执行订阅者事件final Runnable job () - subscriber.onEvent(event);// 执行者final Executor executor subscriber.executor();if (executor ! null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error(Event callback exception: , e);}}
}总结
整体服务订阅的事件机制还是比较复杂的因为用到了事件的形式逻辑比较绕并且其中还有守护线程死循环阻塞队列等。
需要重点理解NotifyCenter对事件发布者、事件订阅者和事件之间关系的维护而这一关系的维护的入口就位于NacosNamingService的init方法当中。
核心流程梳理
ServiceInfoHolder中通过NotifyCenter发布了InstancesChangeEvent事件
NotifyCenter中进行事件发布发布的核心逻辑是
根据InstancesChangeEvent事件类型获得对应的CanonicalName将CanonicalName作为Key从NotifyCenter.publisherMap中获取对应的事件发布者EventPublisherEventPublisher将InstancesChangeEvent事件进行发布
InstancesChangeEvent事件发布
通过EventPublisher的实现类DefaultPublisher进行InstancesChangeEvent事件发布DefaultPublisher本身以守护线程的方式运作在执行业务逻辑前先判断该线程是否启动如果启动则将事件添加到BlockingQueue中队列默认大小为16384添加到BlockingQueue成功则整个发布过程完成如果添加失败则直接调用DefaultPublisher.receiveEvent方法接收事件并通知订阅者通知订阅者时创建一个Runnable对象执行订阅者的EventEvent事件便是执行订阅时传入的事件
如果添加到BlockingQueue成功则走另外一个业务逻辑
DefaultPublisher初始化时会创建一个阻塞BlockingQueue队列并标记线程启动DefaultPublisher本身是一个Thread当执行super.start方法时会调用它的run方法run方法的核心业务逻辑是通过openEventHandler方法处理的openEventHandler方法通过两个for循环从阻塞队列中获取时间信息第一个for循环用于让线程启动时在60s内检查执行条件第二个for循环为死循环从阻塞队列中获取Event并调用DefaultPublisher#receiveEvent方法接收事件并通知订阅者Event事件便是执行订阅时传入的事件