网站名称和备案公司名称不一样,企业单位网站建设内容需要什么,电脑云主机,购买域名做销售网站可以吗目录
1. 单机搭建
2. 测试RocketMQ
3. 集群搭建
4. 集群启动
5. RocketMQ-DashBoard搭建
6. 不同类型消息发送
1.同步消息
2. 异步消息发送
3. 单向发送消息
7. 消费消息 1. 单机搭建
1. 先从rocketmq官网下载二进制包#xff0c;ftp上传至linux服务器#xff0c…目录
1. 单机搭建
2. 测试RocketMQ
3. 集群搭建
4. 集群启动
5. RocketMQ-DashBoard搭建
6. 不同类型消息发送
1.同步消息
2. 异步消息发送
3. 单向发送消息
7. 消费消息 1. 单机搭建
1. 先从rocketmq官网下载二进制包ftp上传至linux服务器unzip命令解压。
2. 启动NameServer
# 1.启动NameServer
nohup sh bin/mqnamesrv
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
3. 启动Broker
RocketMQ默认的虚拟机内存较大启动Broker如果因为内存不足失败需要编辑如下两个配置文件修改JVM内存大小。
# 编辑runbroker.sh和runserver.sh修改默认JVM大小
vi runbroker.sh
vi runserver.sh
参考设置
JAVA_OPT${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m
# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log 2. 测试RocketMQ
1. 发送消息
# 1.设置环境变量
export NAMESRV_ADDRlocalhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2. 接收消息
# 1.设置环境变量
export NAMESRV_ADDRlocalhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
3. 关闭RocketMQ
# 1.关闭NameServer
sh mqshutdown namesrv
# 2.关闭Broker
sh mqshutdown broker 3. 集群搭建
这里采用broker双主双从同步模式NameServer、Producer、Producer集群由于无状态性搭建简单。Master和Slave的brokerName相同brokerId不同
1. 服务器环境
序号IP角色架构模式1192.168.183.132nameserver、brokerserverMaster1、Slave22192.168.183.128nameserver、brokerserverMaster2、Slave1
2. hosts配置两台一样配置
vi /etc/hosts # 重启网卡
systemctl restart network
3. 防火墙关闭
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service
4. 创建消息存储路径
mkdir -p /root/store/master1
mkdir -p /root/store/master1/commitlog
mkdir -p /root/store/master1/consumequeue
mkdir -p /root/store/master1/indexmkdir -p /root/store/slave2
mkdir -p /root/store/slave2/commitlog
mkdir -p /root/store/slave2/consumequeue
mkdir -p /root/store/slave2/indexmkdir -p /root/store/master2
mkdir -p /root/store/master2/commitlog
mkdir -p /root/store/master2/consumequeue
mkdir -p /root/store/master2/indexmkdir -p /root/store/slave1
mkdir -p /root/store/slave1/commitlog
mkdir -p /root/store/slave1/consumequeue
mkdir -p /root/store/slave1/index
5. broker配置文件
1master1
服务器192.168.183.132
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-a.properties
# 所属集群名称
brokerClusterNamerocketmq-cluster
# broker名字注意不同的broker名字必须不同
brokerNamebroker-a
# brokerId brokerId0表示Master大于0表示Slave
brokerId0
brokerIP1rocketmq-master1
# nameserver地址分号分隔
namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时自动创建服务器不存在的topic默认创建的队列数
defalutTopicQueueNums4
# 是否允许自动创建Topic建议线下环境开启线上环境关闭
autoCreateTopicEnabletrue
# 是否允许自动创建订阅组建议线下环境开启线上环境关闭
autoCreateSubscriptionGrouptrue
# Broker 对外暴露端口
listenPort10911
# 删除文件时间点默认凌晨4点
deleteWhen04
# 文件保留时间默认48小时测试环境建议设置120分钟
fileReservedTime120
# commitlog文件大小默认1G
mapedFileSizeCommitLog1073741824
# ConsumeQueue每个文件默认存30w条根据业务情况调整
mapedFileSizeConsumeQueue300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio88#存储路径
storePathRootDir/root/store/master1
#commitLog 存储路径
storePathCommitLog/root/store/master1/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue/root/store/master1/consumequeue
#消息索引存储路径
storePathIndex/root/store/master1/index
#checkpoint 文件存储路径
storeCheckpoint/root/store/master1/checkpoint
#abort 文件存储路径
abortFile/root/store/master1/abort# 限制消息的大小
maxMessageSize65536# Broker角色
brokerRoleSYNC_MASTER
# 刷盘方式
flushDiskTypeSYNC_FLUSH
2slave2
服务器192.168.183.132
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-b-s.properties
# 所属集群名称
brokerClusterNamerocketmq-cluster
# broker名字注意不同的broker名字必须不同
brokerNamebroker-b
# brokerId brokerId0表示Master大于0表示Slave
brokerId1
brokerIP1rocketmq-slave2
# nameserver地址分号分隔
namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时自动创建服务器不存在的topic默认创建的队列数
defalutTopicQueueNums4
# 是否允许自动创建Topic建议线下环境开启线上环境关闭
autoCreateTopicEnabletrue
# 是否允许自动创建订阅组建议线下环境开启线上环境关闭
autoCreateSubscriptionGrouptrue
# Broker 对外暴露端口
listenPort11011
# 删除文件时间点默认凌晨4点
deleteWhen04
# 文件保留时间默认48小时测试环境建议设置120分钟
fileReservedTime120
# commitlog文件大小默认1G
mapedFileSizeCommitLog1073741824
# ConsumeQueue每个文件默认存30w条根据业务情况调整
mapedFileSizeConsumeQueue300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio88#存储路径
storePathRootDir/root/store/slave2
#commitLog 存储路径
storePathCommitLog/root/store/slave2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue/root/store/slave2/consumequeue
#消息索引存储路径
storePathIndex/root/store/slave2/index
#checkpoint 文件存储路径
storeCheckpoint/root/store/slave2/checkpoint
#abort 文件存储路径
abortFile/root/store/slave2/abort# 限制消息的大小
maxMessageSize65536# Broker角色
brokerRoleSLAVE
# 刷盘方式
flushDiskTypeASYNC_FLUSH
3master2
服务器192.168.183.128
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-b.properties
# 所属集群名称
brokerClusterNamerocketmq-cluster
# broker名字注意不同的broker名字必须不同
brokerNamebroker-b
# brokerId brokerId0表示Master大于0表示Slave
brokerId0
brokerIP1rocketmq-master2
# nameserver地址分号分隔
namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时自动创建服务器不存在的topic默认创建的队列数
defalutTopicQueueNums4
# 是否允许自动创建Topic建议线下环境开启线上环境关闭
autoCreateTopicEnabletrue
# 是否允许自动创建订阅组建议线下环境开启线上环境关闭
autoCreateSubscriptionGrouptrue
# Broker 对外暴露端口
listenPort10911
# 删除文件时间点默认凌晨4点
deleteWhen04
# 文件保留时间默认48小时测试环境建议设置120分钟
fileReservedTime120
# commitlog文件大小默认1G
mapedFileSizeCommitLog1073741824
# ConsumeQueue每个文件默认存30w条根据业务情况调整
mapedFileSizeConsumeQueue300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio88#存储路径
storePathRootDir/root/store/master2
#commitLog 存储路径
storePathCommitLog/root/store/master2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue/root/store/master2/consumequeue
#消息索引存储路径
storePathIndex/root/store/master2/index
#checkpoint 文件存储路径
storeCheckpoint/root/store/master2/checkpoint
#abort 文件存储路径
abortFile/root/store/master2/abort# 限制消息的大小
maxMessageSize65536# Broker角色
brokerRoleSYNC_MASTER
# 刷盘方式
flushDiskTypeSYNC_FLUSH4slave1
服务器192.168.183.128
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-a-s.properties
# 所属集群名称
brokerClusterNamerocketmq-cluster
# broker名字注意不同的broker名字必须不同
brokerNamebroker-a
# brokerId brokerId0表示Master大于0表示Slave
brokerId1
brokerIP1rocketmq-slave1
# nameserver地址分号分隔
namesrvAddrrocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时自动创建服务器不存在的topic默认创建的队列数
defalutTopicQueueNums4
# 是否允许自动创建Topic建议线下环境开启线上环境关闭
autoCreateTopicEnabletrue
# 是否允许自动创建订阅组建议线下环境开启线上环境关闭
autoCreateSubscriptionGrouptrue
# Broker 对外暴露端口
listenPort11011
# 删除文件时间点默认凌晨4点
deleteWhen04
# 文件保留时间默认48小时测试环境建议设置120分钟
fileReservedTime120
# commitlog文件大小默认1G
mapedFileSizeCommitLog1073741824
# ConsumeQueue每个文件默认存30w条根据业务情况调整
mapedFileSizeConsumeQueue300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio88#存储路径
storePathRootDir/root/store/slave1
#commitLog 存储路径
storePathCommitLog/root/store/slave1/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue/root/store/slave1/consumequeue
#消息索引存储路径
storePathIndex/root/store/slave1/index
#checkpoint 文件存储路径
storeCheckpoint/root/store/slave1/checkpoint
#abort 文件存储路径
abortFile/root/store/slave1/abort# 限制消息的大小
maxMessageSize65536# Broker角色
brokerRoleSLAVE
# 刷盘方式
flushDiskTypeASYNC_FLUSH4. 集群启动
1启动NameServe集群
分别在192.168.183.132和192.168.183.128启动NameServer
nohup sh mqnamesrv 启动成功。 2启动broker集群
在192.168.183.132上启动master1和slave2
# master1
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-a.properties
# slave2
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-b-s.properties 启动成功。
在192.168.183.128上启动master2和slave1
# slave1
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-a-s.properties
# master2
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-b.properties 3日志查看
# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log 5. RocketMQ-DashBoard搭建
github上拉取项目后修改yml的namesrvAddrs即可。 6. 不同类型消息发送
pom.xml dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.4.0/version/dependency
1.同步消息
SyncProducer.java
/*** 发送同步消息*/
public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer new DefaultMQProducer(group1);// 设置NameServer地址producer.setNamesrvAddr(192.168.183.132:9876;192.168.183.128:9876);// 启动Producer实例producer.start();for (int i 0; i 10; i) {// 创建消息并指定TopicTag消息体Message msg new Message(TopicTest,TagA,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息到brokerSendResult sendResult producer.send(msg);// 发送状态SendStatus status sendResult.getSendStatus();// 消息IDString msgId sendResult.getMsgId();// 消息接收队列IDint queueId sendResult.getMessageQueue().getQueueId();System.out.println(发送状态: status 消息ID: msgId 消息接收队列ID: queueId);TimeUnit.SECONDS.sleep(1);}// 如果不再发送消息关闭Producer实例producer.shutdown();}
} 2. 异步消息发送 AsyncProducer1.java public class AsyncProducer1 {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer new DefaultMQProducer(group1);// 设置NameServer地址producer.setNamesrvAddr(192.168.183.132:9876;192.168.183.128:9876);// 启动Producer实例producer.start();// 设置重试次数producer.setRetryTimesWhenSendAsyncFailed(0);for (int i 0; i 10; i) {// 创建消息并指定TopicTag消息体Message msg new Message(TopicTest,TagB,(Hello World i).getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback 接收异步返回结果的回调int finalI i;producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(发送结果 sendResult);}Overridepublic void onException(Throwable throwable) {System.out.println(发送异常 throwable);}});TimeUnit.SECONDS.sleep(1);}producer.shutdown();}
}3. 单向发送消息
不关心结果比如日志发送
public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// 设置NameServer的地址producer.setNamesrvAddr(192.168.183.132:9876;192.168.183.128:9876);// 启动Producer实例producer.start();for (int i 0; i 100; i) {// 创建消息并指定TopicTag和消息体Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息关闭Producer实例。producer.shutdown();}
} 7. 消费消息 public class Consumer1 {public static void main(String[] args) throws MQClientException {// 实例化消费者指定组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(group1);// 指定NameServer地址信息consumer.setNamesrvAddr(192.168.183.132:9876;192.168.183.128:9876);// 订阅Topicconsumer.subscribe(TopicTest, *);// 负载均衡模式消费consumer.setMessageModel(MessageModel.CLUSTERING); // 广播模式 MessageModel.BROADCASTING// 注册回调函数处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println(Consumer Started.);}
}负载均衡模式消费多个消费者共同处理broker消息队列的消息。
广播模式消费每个消费者都会收到订阅的Topic的消息。 持续更新中......