做ppt的图片素材网站,中山网站建设哪家强,怎样建设网站卖东西,wordpress能找工作吗消息中间件 RocketMQ 高级功能和源码分析#xff08;四#xff09;
一、 消息中间件 RocketMQ 源码分析#xff1a;回顾 NameServer 架构设计。
1、RocketMQ 架构设计
消息中间件的设计思路一般是基于主题订阅发布的机制#xff0c;消息生产者#xff08;Producer…消息中间件 RocketMQ 高级功能和源码分析四
一、 消息中间件 RocketMQ 源码分析回顾 NameServer 架构设计。
1、RocketMQ 架构设计
消息中间件的设计思路一般是基于主题订阅发布的机制消息生产者Producer发送某一个主题到消息服务器消息服务器负责将消息持久化存储消息消费者Consumer订阅该兴趣的主题消息服务器根据订阅信息路由信息将消息推送到消费者Push模式或者消费者主动向消息服务器拉去Pull模式从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系统瘫痪通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消息服务器呢如果某一台消息服务器宕机了那么消息生产者如何在不重启服务情况下感知呢
NameServer 就是为了解决以上问题设计的。 2、Broker 消息服务器
Broker 消息服务器在启动的时向所有 NameServer注册消息生产者Producer在发送消息时之前先从 NameServer 获取 Broker 服务器地址列表然后根据负载均衡算法从列表中选择一台服务器进行发送。NameServer与每台Broker保持长连接并间隔30S检测 Broker 是否存活如果检测到 Broker 宕机则从路由注册表中删除。但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低 NameServer 实现的复杂度在消息发送端提供容错机制保证消息发送的可用性。
3、NameServer 的高可用
NameServer 本身的高可用是通过部署多台 NameServer 来实现但彼此之间不通讯也就是 NameServer 服务器之间在某一个时刻的数据并不完全相同但这对消息发送并不会造成任何影响这也是 NameServer 设计的一个亮点总之RocketMQ 设计追求简单高效。
二、 消息中间件 RocketMQ 源码分析NameServer 启动步骤一。
1、RocketMQ 源码分析 NameServer 启动流程
步骤一
解析配置文件填充 NameServerConfig、NettyServerConfig 属性值并创建 NamesrvController。
步骤二
根据启动属性创建 NamesrvController 实例并初始化该实例。NameServerController 实例为NameServer 核心控制器。
步骤三
在 JVM 进程关闭之前先将线程池关闭及时释放资源。
2、NameServer 启动流程图 3、RocketMQ 源码 启动类org.apache.rocketmq.namesrv.NamesrvStartup.java 源码
/*
D:\RocketMQ\rocketmq-master\namesrv\src\main\java\org\apache\rocketmq\namesrv\NamesrvStartup.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the License); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.namesrv;import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
import org.slf4j.LoggerFactory;public class NamesrvStartup {private static InternalLogger log;private static Properties properties null;private static CommandLine commandLine null;public static void main(String[] args) {main0(args);}public static NamesrvController main0(String[] args) {try {NamesrvController controller createNamesrvController(args);start(controller);String tip The Name Server boot success. serializeType RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf(%s%n, tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options ServerUtil.buildCommandlineOptions(new Options());commandLine ServerUtil.parseCmdLine(mqnamesrv, args, buildCommandlineOptions(options), new PosixParser());if (null commandLine) {System.exit(-1);return null;}final NamesrvConfig namesrvConfig new NamesrvConfig();final NettyServerConfig nettyServerConfig new NettyServerConfig();nettyServerConfig.setListenPort(9876);if (commandLine.hasOption(c)) {String file commandLine.getOptionValue(c);if (file ! null) {InputStream in new BufferedInputStream(new FileInputStream(file));properties new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf(load config properties file OK, %s%n, file);in.close();}}if (commandLine.hasOption(p)) {InternalLogger console InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null namesrvConfig.getRocketmqHome()) {System.out.printf(Please set the %s variable in your environment to match the location of the RocketMQ installation%n, MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() /conf/logback_namesrv.xml);log InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);final NamesrvController controller new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}public static NamesrvController start(final NamesrvController controller) throws Exception {if (null controller) {throw new IllegalArgumentException(NamesrvController is null);}boolean initResult controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new CallableVoid() {Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));controller.start();return controller;}public static void shutdown(final NamesrvController controller) {controller.shutdown();}public static Options buildCommandlineOptions(final Options options) {Option opt new Option(c, configFile, true, Name server config properties file);opt.setRequired(false);options.addOption(opt);opt new Option(p, printConfigItem, false, Print all config item);opt.setRequired(false);options.addOption(opt);return options;}public static Properties getProperties() {return properties;}
}
4、org.apache.rocketmq.namesrv.NamesrvController.java 源码
/*
D:\RocketMQ\rocketmq-master\namesrv\src\main\java\org\apache\rocketmq\namesrv\NamesrvController.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the License); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.namesrv;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.srvutil.FileWatchService;public class NamesrvController {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvConfig namesrvConfig;private final NettyServerConfig nettyServerConfig;private final ScheduledExecutorService scheduledExecutorService Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(NSScheduledThread));private final KVConfigManager kvConfigManager;private final RouteInfoManager routeInfoManager;private RemotingServer remotingServer;private BrokerHousekeepingService brokerHousekeepingService;private ExecutorService remotingExecutor;private Configuration configuration;private FileWatchService fileWatchService;public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {this.namesrvConfig namesrvConfig;this.nettyServerConfig nettyServerConfig;this.kvConfigManager new KVConfigManager(this);this.routeInfoManager new RouteInfoManager();this.brokerHousekeepingService new BrokerHousekeepingService(this);this.configuration new Configuration(log,this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, configStorePath);}public boolean initialize() {this.kvConfigManager.load();this.remotingServer new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.remotingExecutor Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl(RemotingExecutorThread_));this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);if (TlsSystemConfig.tlsMode ! TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged false;Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info(The trust certificate changed, reload the ssl context);reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged true;}if (certChanged keyChanged) {log.info(The certificate and private key changed, reload the ssl context);certChanged keyChanged false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn(FileWatchService created error, cant load the certificate dynamically);}}return true;}private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}}public void start() throws Exception {this.remotingServer.start();if (this.fileWatchService ! null) {this.fileWatchService.start();}}public void shutdown() {this.remotingServer.shutdown();this.remotingExecutor.shutdown();this.scheduledExecutorService.shutdown();if (this.fileWatchService ! null) {this.fileWatchService.shutdown();}}public NamesrvConfig getNamesrvConfig() {return namesrvConfig;}public NettyServerConfig getNettyServerConfig() {return nettyServerConfig;}public KVConfigManager getKvConfigManager() {return kvConfigManager;}public RouteInfoManager getRouteInfoManager() {return routeInfoManager;}public RemotingServer getRemotingServer() {return remotingServer;}public void setRemotingServer(RemotingServer remotingServer) {this.remotingServer remotingServer;}public Configuration getConfiguration() {return configuration;}
}
5、org.apache.rocketmq.common.namesrv.NamesrvConfig.java 源码
/*
D:\RocketMQ\rocketmq-master\common\src\main\java\org\apache\rocketmq\common\namesrv\NamesrvConfig.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the License); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*//*** $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z vintagewangapache.org $*/
package org.apache.rocketmq.common.namesrv;import java.io.File;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;public class NamesrvConfig {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private String rocketmqHome System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));private String kvConfigPath System.getProperty(user.home) File.separator namesrv File.separator kvConfig.json;private String configStorePath System.getProperty(user.home) File.separator namesrv File.separator namesrv.properties;private String productEnvName center;private boolean clusterTest false;private boolean orderMessageEnable false;public boolean isOrderMessageEnable() {return orderMessageEnable;}public void setOrderMessageEnable(boolean orderMessageEnable) {this.orderMessageEnable orderMessageEnable;}public String getRocketmqHome() {return rocketmqHome;}public void setRocketmqHome(String rocketmqHome) {this.rocketmqHome rocketmqHome;}public String getKvConfigPath() {return kvConfigPath;}public void setKvConfigPath(String kvConfigPath) {this.kvConfigPath kvConfigPath;}public String getProductEnvName() {return productEnvName;}public void setProductEnvName(String productEnvName) {this.productEnvName productEnvName;}public boolean isClusterTest() {return clusterTest;}public void setClusterTest(boolean clusterTest) {this.clusterTest clusterTest;}public String getConfigStorePath() {return configStorePath;}public void setConfigStorePath(final String configStorePath) {this.configStorePath configStorePath;}
}
6、org.apache.rocketmq.remoting.netty.NettyServerConfig.java 源码
/*
D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyServerConfig.java* Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the License); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.remoting.netty;public class NettyServerConfig implements Cloneable {private int listenPort 8888;private int serverWorkerThreads 8;private int serverCallbackExecutorThreads 0;private int serverSelectorThreads 3;private int serverOnewaySemaphoreValue 256;private int serverAsyncSemaphoreValue 64;private int serverChannelMaxIdleTimeSeconds 120;private int serverSocketSndBufSize NettySystemConfig.socketSndbufSize;private int serverSocketRcvBufSize NettySystemConfig.socketRcvbufSize;private boolean serverPooledByteBufAllocatorEnable true;/*** make make install*** ../glibc-2.10.1/configure \ --prefix/usr \ --with-headers/usr/include \* --hostx86_64-linux-gnu \ --buildx86_64-pc-linux-gnu \ --without-gd*/private boolean useEpollNativeSelector false;public int getListenPort() {return listenPort;}public void setListenPort(int listenPort) {this.listenPort listenPort;}public int getServerWorkerThreads() {return serverWorkerThreads;}public void setServerWorkerThreads(int serverWorkerThreads) {this.serverWorkerThreads serverWorkerThreads;}public int getServerSelectorThreads() {return serverSelectorThreads;}public void setServerSelectorThreads(int serverSelectorThreads) {this.serverSelectorThreads serverSelectorThreads;}public int getServerOnewaySemaphoreValue() {return serverOnewaySemaphoreValue;}public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {this.serverOnewaySemaphoreValue serverOnewaySemaphoreValue;}public int getServerCallbackExecutorThreads() {return serverCallbackExecutorThreads;}public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {this.serverCallbackExecutorThreads serverCallbackExecutorThreads;}public int getServerAsyncSemaphoreValue() {return serverAsyncSemaphoreValue;}public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {this.serverAsyncSemaphoreValue serverAsyncSemaphoreValue;}public int getServerChannelMaxIdleTimeSeconds() {return serverChannelMaxIdleTimeSeconds;}public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {this.serverChannelMaxIdleTimeSeconds serverChannelMaxIdleTimeSeconds;}public int getServerSocketSndBufSize() {return serverSocketSndBufSize;}public void setServerSocketSndBufSize(int serverSocketSndBufSize) {this.serverSocketSndBufSize serverSocketSndBufSize;}public int getServerSocketRcvBufSize() {return serverSocketRcvBufSize;}public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {this.serverSocketRcvBufSize serverSocketRcvBufSize;}public boolean isServerPooledByteBufAllocatorEnable() {return serverPooledByteBufAllocatorEnable;}public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {this.serverPooledByteBufAllocatorEnable serverPooledByteBufAllocatorEnable;}public boolean isUseEpollNativeSelector() {return useEpollNativeSelector;}public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {this.useEpollNativeSelector useEpollNativeSelector;}Overridepublic Object clone() throws CloneNotSupportedException {return (NettyServerConfig) super.clone();}
}
7、NameServer 启动流程 步骤一
解析配置文件填充 NameServerConfig、NettyServerConfig 属性值并创建 NamesrvController。
8、 代码NamesrvController#createNamesrvController //创建NamesrvConfig
final NamesrvConfig namesrvConfig new NamesrvConfig();
//创建NettyServerConfig
final NettyServerConfig nettyServerConfig new NettyServerConfig();
//设置启动端口号
nettyServerConfig.setListenPort(9876);
//解析启动-c参数
if (commandLine.hasOption(c)) {String file commandLine.getOptionValue(c);if (file ! null) {InputStream in new BufferedInputStream(new FileInputStream(file));properties new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf(load config properties file OK, %s%n, file);in.close();}
}
//解析启动-p参数
if (commandLine.hasOption(p)) {InternalLogger console InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);
}
//将启动参数填充到namesrvConfig,nettyServerConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//创建NameServerController
final NamesrvController controller new NamesrvController(namesrvConfig, nettyServerConfig);9、 NamesrvConfig 属性 private String rocketmqHome System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath System.getProperty(user.home) File.separator namesrv File.separator kvConfig.json;
private String configStorePath System.getProperty(user.home) File.separator namesrv File.separator namesrv.properties;
private String productEnvName center;
private boolean clusterTest false;
private boolean orderMessageEnable false;10、 rocketmqHome rocketmq 主目录
kvConfig NameServer 存储KV配置属性的持久化路径
configStorePath nameServer 默认配置文件路径
orderMessageEnable 是否支持顺序消息
11、NettyServerConfig 属性 private int listenPort 8888;
private int serverWorkerThreads 8;
private int serverCallbackExecutorThreads 0;
private int serverSelectorThreads 3;
private int serverOnewaySemaphoreValue 256;
private int serverAsyncSemaphoreValue 64;
private int serverChannelMaxIdleTimeSeconds 120;
private int serverSocketSndBufSize NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable true;
private boolean useEpollNativeSelector false;listenPort NameServer监听端口该值默认会被初始化为9876 serverWorkerThreads Netty业务线程池线程个数 serverCallbackExecutorThreads Netty public任务线程池线程个数Netty网络设计根据业务类型会创建不同的线程池比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池则由public线程池执行。 serverSelectorThreads IO线程池个数主要是NameServer、Broker端解析请求、返回相应的线程个数这类线程主要是处理网路请求的解析请求包然后转发到各个业务线程池完成具体的操作然后将结果返回给调用方; serverOnewaySemaphoreValue send oneway消息请求并发读Broker端参数; serverAsyncSemaphoreValue 异步消息发送最大并发度; serverChannelMaxIdleTimeSeconds 网络连接最大的空闲时间默认120s。 serverSocketSndBufSize 网络socket发送缓冲区大小。 serverSocketRcvBufSize 网络接收端缓存区大小。 serverPooledByteBufAllocatorEnable ByteBuffer是否开启缓存; useEpollNativeSelector 是否启用Epoll IO模型。
三、 消息中间件 RocketMQ 源码分析NameServer 启动步骤二。
1、NameServer 启动流程 步骤二
根据启动属性创建 NamesrvController 实例并初始化该实例。NameServerController 实例为 NameServer 核心控制器
2、代码NamesrvController#initialize
public boolean initialize() {//加载KV配置this.kvConfigManager.load();//创建NettyServer网络处理对象this.remotingServer new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//开启定时任务:每隔10s扫描一次Broker,移除不活跃的Brokerthis.remotingExecutor Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl(RemotingExecutorThread_));this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);//开启定时任务:每隔10min打印一次KV配置this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);return true;
}四、 消息中间件 RocketMQ 源码分析NameServer 启动步骤三。
1、NameServer 启动流程 步骤三
在 JVM 进程关闭之前先将线程池关闭及时释放资源。
2、代码NamesrvStartup#start
//注册JVM钩子函数代码
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new CallableVoid() {Overridepublic Void call() throws Exception {//释放资源controller.shutdown();return null;}
}));# 消息中间件 RocketMQ 高级功能和源码分析三