网站建设费摊销,域名解析到wordpress,青州市住房和城乡建设局网站,视频网站做视频节目赚钱吗Nacos集群数据同步
当我们有服务进行注册以后#xff0c;会写入注册信息同时会触发ClientChangedEvent事件#xff0c;通过这个事件#xff0c;就会开始进行Nacos的集群数据同步#xff0c;当然这其中只有有一个Nacos节点来处理对应的客户端请求#xff0c;其实这其中…Nacos集群数据同步
当我们有服务进行注册以后会写入注册信息同时会触发ClientChangedEvent事件通过这个事件就会开始进行Nacos的集群数据同步当然这其中只有有一个Nacos节点来处理对应的客户端请求其实这其中还涉及到一个负责节点和非负责节点
负责节点
这是首先我们要查看的是DistroClientDataProcessor客户端数据一致性处理器类型这个类型会处理当前节点负责的Client那我们要查看其中的syncToAllServer方法。
private void syncToAllServer(ClientEvent event) {Client client event.getClient();// 判断客户端是否为空是否是临时实例判断是否是负责节点if (null client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {// 客户端断开连接DistroKey distroKey new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 客户端新增/修改DistroKey distroKey new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}
} distroProtocol会循环所有其他nacos节点提交一个异步任务这个异步任务会延迟1s其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改对于Delete操作由DistroSyncDeleteTask处理对于Change操作由DistroSyncChangeTask处理这里我们从DistroSyncChangeTask来看
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {private static final DataOperation OPERATION DataOperation.CHANGE;public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {super(distroKey, distroComponentHolder);}Overrideprotected DataOperation getDataOperation() {return OPERATION;}// 无回调Overrideprotected boolean doExecute() {String type getDistroKey().getResourceType();DistroData distroData getDistroData(type);if (null distroData) {Loggers.DISTRO.warn([DISTRO] {} with null data to sync, skip, toString());return true;}return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());}// 有回调Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type getDistroKey().getResourceType();DistroData distroData getDistroData(type);if (null distroData) {Loggers.DISTRO.warn([DISTRO] {} with null data to sync, skip, toString());return;}getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}Overridepublic String toString() {return DistroSyncChangeTask for getDistroKey().toString();}// 从DistroClientDataProcessor获取DistroDataprivate DistroData getDistroData(String type) {DistroData result getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null ! result) {result.setType(OPERATION);}return result;}
} 获取到的DistroData其实是从ClientManager实时获取Client。
// DistroClientDataProcessor
Override
public DistroData getDistroData(DistroKey distroKey) {Client client clientManager.getClient(distroKey.getResourceKey());if (null client) {return null;}// 把生成的同步数据放入到数组中byte[] data ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());return new DistroData(distroKey, data);
} AbstractClient继承了Client同时给DistroClientDataProcessorClient提供Client的注册信息包括客户端注册了哪些namespace哪些group哪些service哪些instance。
Override
public ClientSyncData generateSyncData() {ListString namespaces new LinkedList();ListString groupNames new LinkedList();ListString serviceNames new LinkedList();ListInstancePublishInfo instances new LinkedList();for (Map.EntryService, InstancePublishInfo entry : publishers.entrySet()) {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
} 这里我们在回过头来看syncData方法这个方法实际上是由DistroClientTransportAgent封装为DistroDataRequest调用其他Nacos节点。
Override
public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}DistroDataRequest request new DistroDataRequest(data, data.getType());Member member memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn([DISTRO] Cancel distro sync caused by target server {} unhealthy, targetServer);return false;}try {Response response clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error([DISTRO-FAILED] Sync distro data failed! , e);}return false;
}非负责节点
当负责节点将数据发送给非负责节点以后将要处理发送过来的Client数据。这里我们要看DistroClientDataProcessor.processData方法
Override
public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);//处理同步数据handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info([Client-Delete] Received distro client sync data {}, deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}
} 然后来查看具体处理方法handlerClientSyncData
private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info([Client-Add] Received distro client sync data {}, clientSyncData.getClientId());// 同步客户端连接clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());// 获取Client此时注册到的是ConnectionBasedClientClient client clientManager.getClient(clientSyncData.getClientId());// 更新Client数据upgradeClient(client, clientSyncData);
} DistroClientDataProcessor的upgradeClient方法更新Client里的注册表信息发布对应事件
private void upgradeClient(Client client, ClientSyncData clientSyncData) {ListString namespaces clientSyncData.getNamespaces();ListString groupNames clientSyncData.getGroupNames();ListString serviceNames clientSyncData.getServiceNames();ListInstancePublishInfo instances clientSyncData.getInstancePublishInfos();SetService syncedService new HashSet();for (int i 0; i namespaces.size(); i) {Service service Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo instances.get(i);if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}
} **注意**这里要注意下此时的Client实现类ConnectionBasedClient只不过它的isNative属性为false这是非负责节点和负责节点的主要区别。
其实判断当前nacos节点是否为负责节点的依据就是这个isNative属性如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient它的isNative属性为true如果是由Distro协议同步到这个nacos节点上的ConnectionBasedClient它的isNative属性为false。
那其实我们都知道2.x的版本以后使用了长连接所以通过长连接建立在哪个节点上哪个节点就是责任节点客户端也只会向这个责任节点发送请求。
Distro协议负责集群数据统一
Distro为了确保集群间数据一致不仅仅依赖于数据发生改变时的实时同步后台有定时任务做数据同步。
在1.x版本中责任节点每5s同步所有Service的Instance列表的摘要md5给非责任节点非责任节点用对端传来的服务md5比对本地服务的md5如果发生改变需要反查责任节点。
在2.x版本中对这个流程做了改造责任节点会发送Client全量数据非责任节点定时检测同步过来的Client是否过期减少1.x版本中的反查。
责任节点每5s向其他节点发送DataOperationVERIFY类型的DistroData来维持非责任节点的Client数据不过期。
//DistroVerifyTimedTask
Override
public void run() {try {// 所有其他节点ListMember targetServer serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug(server list is: {}, targetServer);}for (String each : distroComponentHolder.getDataStorageTypes()) {// 遍历想这些节点发送Client.isNativetrue的DistroDatatype VERIFYverifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error([DISTRO-FAILED] verify task failed., e);}
} 非责任节点每5s扫描isNativefalse的client如果client30s内没有被VERIFY的DistroData更新过续租时间会删除这个同步过来的Client数据。
//ConnectionBasedClientManager-ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {private final ConnectionBasedClientManager clientManager;public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {this.clientManager clientManager;}Overridepublic void run() {long currentTime System.currentTimeMillis();for (String each : clientManager.allClientId()) {ConnectionBasedClient client (ConnectionBasedClient) clientManager.getClient(each);if (null ! client client.isExpire(currentTime)) {clientManager.clientDisconnected(each);}}}
}
-------------------------------------------------------------------------------------------
Override
public boolean isExpire(long currentTime) {// 判断30s内没有续租 认为过期return !isNative() currentTime - getLastRenewTime() ClientConfig.getInstance().getClientExpiredTime();
}