怎么做中英文版网站,网页查询系统,建站宝盒手机版下载,大连网站建设找简维科技这是本人学习的总结#xff0c;主要学习资料如下
马士兵教育rocketMq官方文档 目录 1、Overview1.1、创建MQClientInstance1.1.1、检查1.1.1、MQClientInstance的ID 1.2、MQClientInstance.start() 1、Overview 这是发送信息的代码样例#xff0c;
DefaultMQProducer produ…这是本人学习的总结主要学习资料如下
马士兵教育rocketMq官方文档 目录 1、Overview1.1、创建MQClientInstance1.1.1、检查1.1.1、MQClientInstance的ID 1.2、MQClientInstance.start() 1、Overview 这是发送信息的代码样例
DefaultMQProducer producer new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
for (int i 0; i MESSAGE_COUNT; i) {try {Message msg new Message(TOPIC, TAG, (Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}
}
producer.shutdown();生产者启动最少需要两个信息group和nameSrvAddr。启动的源码则是producer#start()中。
最终RocketMQ会创建MQClientInstance的实例然后在调用MQClientInstance#start()完成生产者的启动。
1.1、创建MQClientInstance
1.1.1、检查
代码线索DefaultMQProducer#start() - DefaultMQProducerImpl#start() - DefaultMQProducerImpl#checkConfig()。
创建MQClientInstance前做前置检查主要是检查group的格式并且不能和系统的group重命。
1.1.1、MQClientInstance的ID
MQClientInstance由MQClientManager进行管理。MQClientManager整个JVM中只有一个实例其内部用ConcurrentMapString, MQClientManager管理着所有的MQClientInstance其中的String可以看成是每个MQClientInstance的id下面通过源码查看id是如何组成的。
代码线索DefaultMQProducer#start() - DefaultMQProducerImpl#start() - MQClientManager#getInstance()#getOrCreateMQClientInstance() - ClientConfig#buildMQClientId()。
public String buildMQClientId() {StringBuilder sb new StringBuilder();sb.append(this.getClientIP());sb.append();sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append();sb.append(this.unitName);}if (enableStreamRequestType) {sb.append();sb.append(RequestType.STREAM);}return sb.toString();
}很明显每个MQClientInstance的ID主要是由IPinstanceName和unitName组成其中instanceName和unitName都可以设置。所以如果我们想要创建多个MQClientInstance使用的话可以设置不同的instanceName和unitName。 1.2、MQClientInstance.start()
启动一些线程池心跳服务。
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState ServiceState.START_FAILED;// If not specified,looking address from name serverif (null this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel// NRC startthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info(the client factory [{}] start OK, this.clientId);this.serviceState ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException(The Factory object[ this.getClientId() ] has been created before, and failed., null);default:break;}}
}