it网站建设资讯网,葫芦岛网站制作,关键词推广方式,如何将自己做的网站挂到服务器上1.ServerRunner
ServerRunner类实现了CommandLineRunner与DisposableBean接口#xff0c;将会在Spring容器启动和关闭的时间#xff0c;分别执行
run 和 destory 方法。
而seata服务端的启动过程#xff0c;都藏在run方法中 2.整体流程
io.seata.server.Server#start pu…1.ServerRunner
ServerRunner类实现了CommandLineRunner与DisposableBean接口将会在Spring容器启动和关闭的时间分别执行
run 和 destory 方法。
而seata服务端的启动过程都藏在run方法中 2.整体流程
io.seata.server.Server#start public static void start(String[] args) {// create loggerfinal Logger logger LoggerFactory.getLogger(Server.class);//initialize the parameter parser//Note that the parameter parser should always be the first line to execute.//Because, here we need to parse the parameters needed for startup.//1.解析配置文件ParameterParser parameterParser new ParameterParser(args);//2.初始化监控MetricsManager.get().init();System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());ThreadPoolExecutor workingThreads new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,new LinkedBlockingQueue(NettyServerConfig.getMaxTaskQueueSize()),new NamedThreadFactory(ServerHandlerThread, NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());//3.创建netty用于服务端TC与客户端TMRM通信NettyRemotingServer nettyRemotingServer new NettyRemotingServer(workingThreads);//4.初始化UUID生成器雪花算法用于生成全局事务与分支事务idUUIDGenerator.init(parameterParser.getServerNode());//log store mode : file, db, redis//5.设置全局事务与分支事务持久化的三种方式SessionHolder.init(parameterParser.getSessionStoreMode());LockerManagerFactory.init(parameterParser.getLockStoreMode());//6.创建并初始化事务协调者 并绑定到NettyRemotingServer到TransactionMessageHandler监听事务消息并处理DefaultCoordinator coordinator DefaultCoordinator.getInstance(nettyRemotingServer);coordinator.init();nettyRemotingServer.setHandler(coordinator);//7.注册销毁事件// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028ServerRunner.addDisposable(coordinator);//127.0.0.1 and 0.0.0.0 are not valid here.if (NetUtil.isValidIp(parameterParser.getHost(), false)) {XID.setIpAddress(parameterParser.getHost());} else {String preferredNetworks ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);if (StringUtils.isNotBlank(preferredNetworks)) {XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));} else {XID.setIpAddress(NetUtil.getLocalIp());}}//8.启动nettyRemotingServernettyRemotingServer.init();}1.解析配置文件
主要从三处地方获取配置 启动命令中 解析启动命令使用的是JCommander主要有以下的命令 help 打印帮助信息 –host -h 设置注册中心ip “–port”, “-p” 设置注册中心端口 “–storeMode”, “-m” 设置日志存储模式 “–serverNode”, “-n” 设置服务节点 “–seataEnv”, “-e” seata环境设置 “–sessionStoreMode”, “-ssm” 设置会话存储模式主要有三种文件数据库redis “–lockStoreMode”, “-lsm” 设置锁信息的存储主要有三种文件数据库redis 容器中 配置中心或者文件中 private void init(String[] args) {try {//从命令行中获取配置信息getCommandParameters(args);//从环境容器中获取配置信息getEnvParameters();if (StringUtils.isNotBlank(seataEnv)) {System.setProperty(ENV_PROPERTY_KEY, seataEnv);}//从文件中获取配置信息if (StringUtils.isBlank(storeMode)) {storeMode CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE);}if (StringUtils.isBlank(sessionStoreMode)) {sessionStoreMode CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode);}if (StringUtils.isBlank(lockStoreMode)) {lockStoreMode CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode);}} catch (ParameterException e) {printError(e);}}2. 初始化监控信息
MetricsManager.get().init();默认是关闭的
3.创建netty用于服务端TC与客户端TMRM通信 ThreadPoolExecutor workingThreads new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,new LinkedBlockingQueue(NettyServerConfig.getMaxTaskQueueSize()),new NamedThreadFactory(ServerHandlerThread, NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());//3.创建netty用于服务端TC与客户端TMRM通信NettyRemotingServer nettyRemotingServer new NettyRemotingServer(workingThreads);父抽象类 AbstractNettyRemotingServer 中设置 消息处理器 public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);serverBootstrap new NettyServerBootstrap(nettyServerConfig);serverBootstrap.setChannelHandlers(new ServerHandler());}4.初始化UUID生成器雪花算法用于生成全局事务与分支事务id //4.初始化UUID生成器雪花算法用于生成全局事务与分支事务idUUIDGenerator.init(parameterParser.getServerNode());5.初始化SessionManager与LockManager管理会话与锁信息
SessionHolder.init(parameterParser.getSessionStoreMode());LockerManagerFactory.init(parameterParser.getLockStoreMode());SessionManager有三个不同的实现通过SPI机制加载
io.seata.server.storage.file.session.FileSessionManager
io.seata.server.storage.db.session.DataBaseSessionManager
io.seata.server.storage.redis.session.RedisSessionManagerresource下的META-INF文件夹中的service文件根据要加载类的权限定类名找对应的文件文件中存放着对应需要加载的类的权限定类型 io.seata.server.session.SessionHolder#init
public static void init(String mode) {if (StringUtils.isBlank(mode)) {mode CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));}StoreMode storeMode StoreMode.get(mode);if (StoreMode.DB.equals(storeMode)) {ROOT_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());ASYNC_COMMITTING_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});RETRY_COMMITTING_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});RETRY_ROLLBACKING_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});DISTRIBUTED_LOCKER DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());} else if (StoreMode.FILE.equals(storeMode)) {String sessionStorePath CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,DEFAULT_SESSION_STORE_FILE_DIR);if (StringUtils.isBlank(sessionStorePath)) {throw new StoreException(the {store.file.dir} is empty.);}ROOT_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});ASYNC_COMMITTING_SESSION_MANAGER ROOT_SESSION_MANAGER;RETRY_COMMITTING_SESSION_MANAGER ROOT_SESSION_MANAGER;RETRY_ROLLBACKING_SESSION_MANAGER ROOT_SESSION_MANAGER;DISTRIBUTED_LOCKER DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());} else if (StoreMode.REDIS.equals(storeMode)) {ROOT_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());ASYNC_COMMITTING_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});RETRY_COMMITTING_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});RETRY_ROLLBACKING_SESSION_MANAGER EnhancedServiceLoader.load(SessionManager.class,StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});DISTRIBUTED_LOCKER DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());} else {// unknown storethrow new IllegalArgumentException(unknown store mode: mode);}reload(storeMode);}EnhancedServiceLoader.load(SessionManager.class, xxxName;通过SPI机制获取所有需要装配的类后通过参数二最终选择某一个类
LoadLevel(name db, scope Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManagerLoadLevel(name file, scope Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable {LoadLevel(name redis, scope Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager除了META-INF/services/ 路径还会找 META-INF/seata/路径下
io.seata.common.loader.EnhancedServiceLoader.InnerEnhancedServiceLoader#findAllExtensionDefinition LockManager同样也有三个实现类也是通过SPI机制加载
io.seata.server.storage.db.lock.DataBaseLockManager
io.seata.server.storage.file.lock.FileLockManager
io.seata.server.storage.redis.lock.RedisLockManager6.创建并初始化事务协调者 DefaultCoordinator 并监听事务消息并处理
创建DefaultCoordinator实列
实列 DefaultCoordinator最重要的就是给两个属性赋值 remotingServer core
remotingServer是用来TC与TMRM通信的而core则是真正的事务协调者TC
core的coreMap属性将缓存4个实现类对应seata的四种模式XAATTCCSAGA实现类同样也是通过SPI机制加载并在完成加载后缓存到coreMap中
key为枚举类BranchType到值value为SPI加载到对应类
io.seata.server.coordinator.DefaultCore#DefaultCoreDefaultCoordinator 初始化
初始化即启动5个周期线程 //周期重试回滚事务retryRollbacking.scheduleAtFixedRate(() - SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期重试提交事务retryCommitting.scheduleAtFixedRate(() - SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期异步提交事务asyncCommitting.scheduleAtFixedRate(() - SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期超时检测timeoutCheck.scheduleAtFixedRate(() - SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);//周期清理回滚日志undoLogDelete.scheduleAtFixedRate(() - SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);每个线程池的任务逻辑基本都是一样的只不过针对不同的事务状态 protected void handleRetryRollbacking() {//条件对象只处理对应状态的事务会话SessionCondition sessionCondition new SessionCondition(rollbackingStatuses);sessionCondition.setLazyLoadBranch(true);//查找所有的全局会话 如果当前事务会话持久方式是DB则从表中条件查找CollectionGlobalSession rollbackingSessions SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);//如果为空表示当前没有事务则直接返回if (CollectionUtils.isEmpty(rollbackingSessions)) {return;}long now System.currentTimeMillis();//遍历所有的事务会话SessionHelper.forEach(rollbackingSessions, rollbackingSession - {try {//如果是正在回滚中的或者已经死亡的事务会话跳过// prevent repeated rollbackif (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking) !rollbackingSession.isDeadSession()) {// The function of this return is continue.return;}//如果超时if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {//有锁则释放全局锁if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {rollbackingSession.clean();}//删除全局事务会话// Prevent thread safety issuesSessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);LOGGER.error(Global transaction rollback retry timeout and has removed [{}], rollbackingSession.getXid());SessionHelper.endRollbackFailed(rollbackingSession, true);// rollback retry timeout eventMetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);//The function of this return is continue.return;}rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//如果没有超时执行回滚core.doGlobalRollback(rollbackingSession, true);} catch (TransactionException ex) {LOGGER.info(Failed to retry rollbacking [{}] {} {}, rollbackingSession.getXid(), ex.getCode(), ex.getMessage());}});}nettyRemotingServer 通信服务类设置handler
将创建的TC绑定到nettyRemotingServer
7.注册销毁事件 // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028ServerRunner.addDisposable(coordinator);将会在Sping容器关闭的时候调用coordinator的destory方法 Overridepublic void destroy() {// 1. first shutdown timed taskretryRollbacking.shutdown();retryCommitting.shutdown();asyncCommitting.shutdown();timeoutCheck.shutdown();undoLogDelete.shutdown();branchRemoveExecutor.shutdown();try {retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);} catch (InterruptedException ignore) {}// 2. second close netty flowif (remotingServer instanceof NettyRemotingServer) {((NettyRemotingServer) remotingServer).destroy();}// 3. third destroy SessionHolderSessionHolder.destroy();instance null;}8.启动NettyRemotingServer处理TM与RM消息 Overridepublic void init() {// registry processorregisterProcessor();if (initialized.compareAndSet(false, true)) {super.init();}}registerProcessor 方法主要是注册一系列消息处理器 private void registerProcessor() {// 1. registry on request message processorServerOnRequestProcessor onRequestProcessor new ServerOnRequestProcessor(this, getHandler());ShutdownHook.getInstance().addDisposable(onRequestProcessor);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);// 2. registry on response message processorServerOnResponseProcessor onResponseProcessor new ServerOnResponseProcessor(getHandler(), getFutures());super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);// 3. registry rm message processorRegRmProcessor regRmProcessor new RegRmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);// 4. registry tm message processorRegTmProcessor regTmProcessor new RegTmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);// 5. registry heartbeat message processorServerHeartbeatProcessor heartbeatMessageProcessor new ServerHeartbeatProcessor(this);super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);}调用 super.registerProcessor 方法 将 消息处理器与线程池封装成一个Pair然后在放到map中map的key为消息类型value为Pair。
ServerOnRequestProcessor 的 transactionMessageHandler 成员变量为第6步创建的TC super.init 方法则是启动netty Overridepublic void init() {//启动超时检测super.init();//启动nettyserverBootstrap.start();}Overridepublic void start() {this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker).channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),nettyServerConfig.getWriteBufferHighWaterMark())).localAddress(new InetSocketAddress(getListenPort())).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)).addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers ! null) {addChannelPipelineLast(ch, channelHandlers);}}});try {this.serverBootstrap.bind(getListenPort()).sync();XID.setPort(getListenPort());LOGGER.info(Server started, service listen port: {}, getListenPort());RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));initialized.set(true);} catch (SocketException se) {throw new RuntimeException(Server start failed, the listen port: getListenPort(), se);} catch (Exception exx) {throw new RuntimeException(Server start failed, exx);}}在第3步时创建并设置了双向通道消息处理器 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);serverBootstrap new NettyServerBootstrap(nettyServerConfig);serverBootstrap.setChannelHandlers(new ServerHandler());}ChannelDuplexHandler是Netty框架中的一个双向通道处理器用于处理网络通信中的读写事件。它继承自ChannelInboundHandler和ChannelOutboundHandler可以同时处理入站和出站事件。当通道中有消息时将调用io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler#channelRead方法
处理消息 protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format(%s msgId:%s, body:%s, this, rpcMessage.getId(), rpcMessage.getBody()));}Object body rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware (MessageTypeAware) body;//根据消息的类型获取Pair找到消息处理器final PairRemotingProcessor, ExecutorService pair this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair ! null) {if (pair.getSecond() ! null) {try {pair.getSecond().execute(() - {try {//处理消息pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),thread pool is full, current max pool size is messageExecutor.getActiveCount());if (allowDumpStack) {String name ManagementFactory.getRuntimeMXBean().getName();String pid name.split()[0];long idx System.currentTimeMillis();try {String jstackFile idx .log;LOGGER.info(jstack command will dump to jstackFile);Runtime.getRuntime().exec(String.format(jstack %s %s, pid, jstackFile));} catch (IOException exx) {LOGGER.error(exx.getMessage());}allowDumpStack false;}}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error(This message type [{}] has no processor., messageTypeAware.getTypeCode());}} else {LOGGER.error(This rpcMessage body[{}] is not MessageTypeAware type., body);}}假设此次消息是TM开启全局事务的单条消息则将交由ServerOnRequestProcessor处理,ServerOnRequestProcessor处理的消息类型如下 * RM:* 1) {link MergedWarpMessage}* 2) {link BranchRegisterRequest}* 3) {link BranchReportRequest}* 4) {link GlobalLockQueryRequest}* TM:* 1) {link MergedWarpMessage}* 2) {link GlobalBeginRequest}* 3) {link GlobalCommitRequest}* 4) {link GlobalReportRequest}* 5) {link GlobalRollbackRequest}* 6) {link GlobalStatusRequest}Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info(closeChannelHandlerContext channel: ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format(close a unhandled connection! [%s], ctx.channel().toString()));}}}private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message rpcMessage.getBody();RpcContext rpcContext ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug(server received:{},clientIp:{},vgroup:{}, message,NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message ,clientIp: NetUtil.toIpAddress(ctx.channel().remoteAddress()) ,vgroup: rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error(put message to logQueue error: {}, e.getMessage(), e);}}if (!(message instanceof AbstractMessage)) {return;}//处理多条消息// the batch send request messageif (message instanceof MergedWarpMessage) {if (NettyServerConfig.isEnableTcServerBatchSendResponse() StringUtils.isNotBlank(rpcContext.getVersion()) Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {ListAbstractMessage msgs ((MergedWarpMessage)message).msgs;ListInteger msgIds ((MergedWarpMessage)message).msgIds;for (int i 0; i msgs.size(); i) {AbstractMessage msg msgs.get(i);int msgId msgIds.get(i);if (PARALLEL_REQUEST_HANDLE) {CompletableFuture.runAsync(() - handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));} else {handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);}}} else {ListAbstractResultMessage results new CopyOnWriteArrayList();ListCompletableFutureVoid completableFutures null;for (int i 0; i ((MergedWarpMessage)message).msgs.size(); i) {if (PARALLEL_REQUEST_HANDLE) {if (completableFutures null) {completableFutures new ArrayList();}int finalI i;completableFutures.add(CompletableFuture.runAsync(() - {results.add(finalI, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(finalI), rpcContext));}));} else {results.add(i,handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));}}if (CollectionUtils.isNotEmpty(completableFutures)) {try {CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();} catch (InterruptedException | ExecutionException e) {LOGGER.error(handle request error: {}, e.getMessage(), e);}}MergeResultMessage resultMessage new MergeResultMessage();resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);}} else {//处理单条消息// the single send request messagefinal AbstractMessage msg (AbstractMessage) message;AbstractResultMessage result transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}最终将调用io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//获取全局事务id放入response中response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info(Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{},rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}core.begin方法 Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalSession session GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());session.begin();// transaction start eventMetricsPublisher.postSessionDoingEvent(session, false);return session.getXid();}public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {this.transactionId UUIDGenerator.generateUUID();this.status GlobalStatus.Begin;this.lazyLoadBranch lazyLoadBranch;if (!lazyLoadBranch) {this.branchSessions new ArrayList();}this.applicationId applicationId;this.transactionServiceGroup transactionServiceGroup;this.transactionName transactionName;this.timeout timeout;this.xid XID.generateXID(transactionId);}首先通过雪花算法生成事务id然后调用 XID.generateXID(transactionId)
最终格式为 ip端口号事务id
public static String generateXID(long tranId) {return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();}【大部分内容来源于https://saint.blog.csdn.net/article/details/126457129。推荐各位学习seata的朋友看一看】