可以浏览国外网站,展馆,建设银行车贷网站,企业做网站的好处有哪些#x1f3af; 导读#xff1a;本文档详细介绍了如何在Spring Boot应用中集成Apache RocketMQ#xff0c;并实现消息生产和消费功能。首先通过创建消息生产者项目#xff0c;配置POM文件引入RocketMQ依赖#xff0c;实现同步消息发送#xff0c;并展示了如何发送普通字符串… 导读本文档详细介绍了如何在Spring Boot应用中集成Apache RocketMQ并实现消息生产和消费功能。首先通过创建消息生产者项目配置POM文件引入RocketMQ依赖实现同步消息发送并展示了如何发送普通字符串消息、对象消息以及集合消息。接着文档讲解了如何搭建消息消费者项目包括配置RocketMQ监听器以消费不同类型的RocketMQ消息。此外还探讨了RocketMQ的不同消息类型如延迟消息、顺序消息的发送方式以及消息消费模式的选择与实现负载均衡模式与广播模式。 文章目录 RocketMQ集成SpringBoot入门案例搭建rocketmq-producer消息生产者创建项目完整的pom.xml修改配置文件application.yml在测试类里面测试发送消息 搭建rocketmq-consumer消息消费者创建项目完整的pom.xml修改配置文件application.yml监听器SimpleMsgListener启动rocketmq-consumer 发送对象消息和集合消息发送对象消息消息内容为对象生产者消费者 发送集合消息消息内容为集合 发送不同模式的消息发送同步消息发送异步消息发送单向消息发送延迟消息发送顺序消息生产者ObjMsgListener 发送事务消息事务消息的处理逻辑 消息过滤tag过滤常在消费者端过滤生产者TagMsgListenersql92表达式 Key过滤可以在事务监听的类里面区分生产者断点发送这个消息查看事务里面消息头 两种消费模式消费者1消费者2启动两个消费者在生产者里面添加一个单元测试并且运行查看两个消费者的控制台BROADCASTING 广播模式 RocketMQ集成SpringBoot入门案例
搭建rocketmq-producer消息生产者 创建项目完整的pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.3/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.test/groupIdartifactId01-rocketmq-producer/artifactIdversion0.0.1-SNAPSHOT/versionnamerocketmq-producer/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- rocketmq的依赖 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.2/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configuration/plugin/plugins/build/project如果想用RocketMQ 5.x版本可以用2.3.0
修改配置文件application.yml
注意rocketmq不是在spring层级下面的很容易搞错以为和SpringBoot整合就是在spring层级下面了
spring:application:name: rocketmq-producer
rocketmq:# rocketMq的nameServer地址name-server: 127.0.0.1:9876 producer:# 生产者组别group: test-group# 消息发送的超时时间send-message-timeout: 3000# 异步消息发送失败重试次数retry-times-when-send-async-failed: 2# 发送消息的最大大小单位字节这里等于4Mmax-message-size: 4194304 在测试类里面测试发送消息
往test主题里面发送一个简单的字符串消息
/*** 注入rocketMQTemplate我们使用它来操作mq*/
Autowired
private RocketMQTemplate rocketMQTemplate;/*** 测试发送简单的消息** throws Exception*/
Test
public void testSimpleMsg() throws Exception {// 往test的主题里面发送一个简单的字符串消息// syncSend同步消息// asyncSend异步// 参数1:topic名 参数2Object数据SendResult sendResult rocketMQTemplate.syncSend(test, 我是一个简单的消息);// 拿到消息的发送状态System.out.println(sendResult.getSendStatus());// 拿到消息的idSystem.out.println(sendResult.getMsgId());
}运行后查看控制台 搭建rocketmq-consumer消息消费者 创建项目完整的pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.3/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.test/groupIdartifactId02-rocketmq-consumer/artifactIdversion0.0.1-SNAPSHOT/versionnamerocketmq-consumer/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- rocketmq的依赖 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.0.2/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configuration/plugin/plugins/build/project修改配置文件application.yml
spring:application:name: rocketmq-consumer
rocketmq:name-server: 127.0.0.1:9876
# consumer
# group: aaa-group 不需要写一个项目一般有很多消费者组监听器SimpleMsgListener
消费者要消费消息就添加一个监听器SpringBoot一启动监听器就开始持续工作 1、类上添加注解 Component 和 RocketMQMessageListener 。topic指定消费的主题consumerGroup 指定消费组一个主题可以有多个消费者组一个消息可以被多个不同的组的消费者都消费 2、实现 RocketMQListener 接口泛型可以为具体的数据类型如果想拿到消息的其他参数如消息头、消息体例如key之类的泛型用MessageExt
package com.test.listener;import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;Component
RocketMQMessageListener(topic test, consumerGroup test-group, messageModel MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListenerString {/*** 消费消息的方法* * param message 消息内容类型和上面的泛型一致。如果泛型指定了固定的类型消息体就是我们的参数*/Overridepublic void onMessage(String message) {System.out.println(message);}
}泛型使用MessageExt
Component
RocketMQMessageListener(topic test, consumerGroup test-group, messageModel MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListenerMessageExt {/*** 消费消息的方法* * param message 消息内容类型和上面的泛型一致。如果泛型指定了固定的类型消息体就是我们的参数*/Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}onMessage方法没有返回值要表示消息签收
方法报错就拒收方法不报错就签收
启动rocketmq-consumer
查看控制台发现我们已经监听到消息了 发送对象消息和集合消息
发送对象消息消息内容为对象
消费者监听的时候泛型中写对象的类型即可
生产者
rocketmq-producer 添加一个Order类
package com.test.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;/*** 订单对象*/
Data
AllArgsConstructor
NoArgsConstructor
public class Order {/*** 订单号*/private String orderId;/*** 订单名称*/private String orderName;/*** 订单价格*/private Double price;/*** 订单号创建时间*/private Date createTime;/*** 订单描述*/private String desc;}rocketmq-producer 添加一个单元测试
/*** 测试发送对象消息** throws Exception*/
Test
public void testObjectMsg() throws Exception {Order order new Order();order.setOrderId(UUID.randomUUID().toString());order.setOrderName(我的订单);order.setPrice(998D);order.setCreateTime(new Date());order.setDesc(加急配送);// 往test-obj主题发送一个订单对象rocketMQTemplate.syncSend(test-obj, order);
}发送此消息
消费者
rocketmq-consumer 将Order类拷贝过来
rocketmq-consumer 添加一个监听器
package com.test.listener;import com.test.domain.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 创建一个对象消息的监听* 1.类上添加注解Component和RocketMQMessageListener* 2.实现RocketMQListener接口注意泛型的使用*/
Component
RocketMQMessageListener(topic test-obj, consumerGroup test-obj-group)
public class ObjMsgListener implements RocketMQListenerOrder {/*** 消费消息的方法** param message*/Overridepublic void onMessage(Order message) {System.out.println(message);}
}重启rocketmq-consumer后查看控制台监听的对象消息如下 发送集合消息消息内容为集合
生产者创建一个Order的集合发送消息监听方修改泛型中的类型为 Object 接收到消息之后再做类型强转 发送不同模式的消息
发送同步消息
同步消息消息由生产者发送到 broker 后会得到一个确认具有强可靠性。用于重要的消息通知、短信通知等
入门案例演示发送的就是同步消息。下面三种发送消息的方法底层都是调用syncSend发送的都是同步消息
rocketMQTemplate.syncSend()rocketMQTemplate.send()rocketMQTemplate.convertAndSend()
发送异步消息
rocketMQTemplate.asyncSend()
/*** 测试异步发送消息** throws Exception*/
Test
public void testAsyncSend() throws Exception {// 发送异步消息发送完以后会有一个异步通知rocketMQTemplate.asyncSend(test, 发送一个异步消息, new SendCallback() {/*** 成功的回调* param sendResult*/Overridepublic void onSuccess(SendResult sendResult) {System.out.println(发送成功);}/*** 失败的回调* param throwable*/Overridepublic void onException(Throwable throwable) {System.out.println(发送失败);}});// 测试一下异步的效果System.out.println(谁先执行);// 挂起jvm 不让方法结束System.in.read();
}测试发现。“谁先执行”打印在前面“发送成功” 打印在后面
发送单向消息
单向消息用在不关心发送结果的场景吞吐量很大存在消息丢失的风险可用于日志信息的发送
/*** 测试单向消息** throws Exception*/
Test
public void testOnWay() throws Exception {// 发送单向消息没有返回值和结果rocketMQTemplate.sendOneWay(test, 这是一个单向消息);
}发送延迟消息
延迟等级从1级开始分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
/*** 测试延迟消息** throws Exception*/
Test
public void testDelay() throws Exception {// 构建消息对象MessageString message MessageBuilder.withPayload(我是一个延迟消息).build();// 发送一个延时消息延迟等级为4级也就是30s后被监听消费// 参数3连接MQ的超时时间 参数4延迟等级SendResult sendResult rocketMQTemplate.syncSend(test, message, 2000, 4);System.out.println(sendResult.getSendStatus());
}运行后查看消费者端过了30s才被消费
发送顺序消息
上面消息的消费者都一样的实现方法。但顺序消息的消费不太一样消费者需要单线程消费
生产者
/*** 测试顺序消费* mq会根据hash的值来存放到一个队列里面去** throws Exception*/
Test
public void testOrderly() throws Exception {ListOrder orders Arrays.asList(new Order(UUID.randomUUID().toString().substring(0, 5), 张三的下订单, null, null, null, 1),new Order(UUID.randomUUID().toString().substring(0, 5), 张三的发短信, null, null, null, 1),new Order(UUID.randomUUID().toString().substring(0, 5), 张三的物流, null, null, null, 1),new Order(UUID.randomUUID().toString().substring(0, 5), 张三的签收, null, null, null, 1),new Order(UUID.randomUUID().toString().substring(0, 5), 李四的下订单, null, null, null, 2),new Order(UUID.randomUUID().toString().substring(0, 5), 李四的发短信, null, null, null, 2),new Order(UUID.randomUUID().toString().substring(0, 5), 李四的物流, null, null, null, 2),new Order(UUID.randomUUID().toString().substring(0, 5), 李四的签收, null, null, null, 2));// 我们控制流程为 下订单-发短信-物流-签收 hash的值为seq也就是说 seq相同的会放在同一个队列里面顺序消费orders.forEach(order - {rocketMQTemplate.syncSendOrderly(test-obj, order, String.valueOf(order.getSeq()));});
}运行发送消息
ObjMsgListener
consumeMode指定消费类型
CONCURRENTLY 并发消费不按照顺序ORDERLY 顺序消费消息放到一个队列用一个线程来消费
package com.test.listener;import com.test.domain.Order;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;Component
RocketMQMessageListener(topic test-obj,consumerGroup test-obj-group,// 修改为顺序消费模式单线程consumeMode ConsumeMode.ORDERLY
)
public class ObjMsgListener implements RocketMQListenerOrder {/*** 消费消息的方法* param message*/Overridepublic void onMessage(Order message) {System.out.println(message);}
}重启rocketmq-consumer查看控制台消息按照我们的放入顺序进行消费了 发送事务消息
rocketmq-producer 添加一个单元测试
/*** 测试事务消息* 默认是sync同步的* 事务消息会有确认和回查机制* 事务消息都会走到同一个监听回调里面所以我们需要使用tag或者key来区分过滤** throws Exception*/
Test
public void testTrans() throws Exception {// 构建消息体MessageString message MessageBuilder.withPayload(这是一个事务消息).build();// 发送事务消息同步的 最后一个参数才是消息主题TransactionSendResult transaction rocketMQTemplate.sendMessageInTransaction(test, message, 消息的参数);// 拿到本地事务状态System.out.println(transaction.getLocalTransactionState());// 挂起jvm因为事务的回查需要一些时间System.in.read();
}rocketmq-producer 添加一个本地事务消息的监听半消息
/*** 事务消息的监听与回查* 类上添加注解RocketMQTransactionListener 表示这个类是本地事务消息的监听类* 实现RocketMQLocalTransactionListener接口* 两个方法为执行本地事务与回查本地事务*/
Component
RocketMQTransactionListener(corePoolSize 4,maximumPoolSize 8)
public class TmMsgListener implements RocketMQLocalTransactionListener {/*** 执行本地事务这里可以执行想做的业务比如操作数据库* 操作成功就 return RocketMQLocalTransactionState.COMMIT;* 操作失败就 return RocketMQLocalTransactionState.UNKNOWN;* 可以使用try catch来控制成功或者失败* param msg* param arg* return*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 拿到消息参数System.out.println(arg);// 拿到消息头System.out.println(msg.getHeaders());// 执行业务// 返回状态 COMMIT 或者 UNKNOWNif (成功){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;} }/*** 此方法为回查方法执行需要等待一会 * 回查本地事务只有上面的执行方法返回UNKNOWN时才执行下面的方法 默认是1min回查* 这里可以执行一些检查的方法* 如果返回COMMIT那么本地事务就算是提交成功了消息就会被消费者看到** param msg* return*/Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {System.out.println(msg);return RocketMQLocalTransactionState.COMMIT;}
}事务消息的处理逻辑
消息会先到事务监听类的执行方法如果返回状态为COMMIT则消费者可以直接监听到如果返回状态为ROLLBACK则消息发送失败直接回滚如果返回状态为UNKNOW则过一段时间走回查方法如果回查方法返回状态为UNKNOW或者ROLLBACK则消息发送失败直接回滚如果回查方法返回状态为COMMIT则消费者可以直接监听到
消息过滤
tag过滤常在消费者端过滤
从源码注释得知tag带在主题后面用:来携带tag 在org.apache.rocketmq.spring.support.RocketMQUtil 的getAndWrapMessage方法里面看到了具体细节keys要在消息头里面携带 生产者
/*** 发送一个带tag的消息** throws Exception*/
Test
public void testTagMsg() throws Exception {// 发送一个tag为java的数据rocketMQTemplate.syncSend(test-tag:java, 我是一个带tag的消息);
}TagMsgListener
1、类上添加注解Component和RocketMQMessageListener。selectorType SelectorType.TAG, 指定使用tag过滤。 (也可以使用sql92 需要在配置文件broker.conf中开启enbalePropertyFiltertrue)
2、selectorExpression “java” 表达式默认是*支持tag1 || tag2 || tag3 监听多个标签 3、实现RocketMQListener接口注意泛型的使用
注意一个标签用一个消费者组
package com.test.listener;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;Component
RocketMQMessageListener(topic test-tag,consumerGroup test-tag-group,// 注明使用tag模式selectorType SelectorType.TAG,selectorExpression java||C
)
public class TagMsgListener implements RocketMQListenerString {/*** 消费消息的方法** param message*/Overridepublic void onMessage(String message) {System.out.println(message);}
}sql92表达式
一般不用这种模式
发送消息的时候携带数字标签 监听的消息的时候通过数字来过滤消息 表达式可用语法
AND, OR, , , , BETWEEN A AND B, equals to A AND BNOT BETWEEN A AND B, equals to B OR AIN (‘a’, ‘b’), equals to ‘a’ OR ‘b’, this operation only support String type.IS NULL, IS NOT NULL, check parameter whether is null, or not.TRUE, FALSE, check parameter whether is true, or false.
Example: (a 10 AND a 100) OR (b IS NOT NULL AND bTRUE)
Key过滤可以在事务监听的类里面区分
生产者
key需要放在消息头上面
/*** 发送一个带 key 的消息我们使用事务消息打断点查看消息头** throws Exception*/
Test
public void testKeyMsg() throws Exception {// 发送一个key为spring的事务消息MessageString message MessageBuilder.withPayload(我是一个带key的消息).setHeader(RocketMQHeaders.KEYS, spring).build();rocketMQTemplate.sendMessageInTransaction(test, message, 我是一个带key的消息);
}断点发送这个消息查看事务里面消息头 我们在mq的控制台也可以看到 两种消费模式
RocketMQ消息消费的模式分为两种负载均衡模式和广播模式
CLUSTERING负载均衡模式多个消费者交替消费同一个主题里面的消息BROADCASTING广播模式每个消费者都消费一遍订阅的主题的消息
消费者1
再搭建一个消费者rocketmq-consumer-b依赖和配置文件和rocketmq-consumer一致记住端口修改一下避免占用。rocketmq-consumer-b添加一个监听
package com.test.listener;import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** messageModel 指定消息消费的模式* CLUSTERING 为负载均衡模式* BROADCASTING 为广播模式*/
Component
RocketMQMessageListener(topic test,consumerGroup test-group,// 集群模式messageModel MessageModel.CLUSTERING
)
public class ConsumerBListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(message);}
}消费者2
修改rocketmq-consumer的SimpleMsgListener
/*** 创建一个简单消息的监听* 1.类上添加注解Component和RocketMQMessageListener** RocketMQMessageListener(topic test, consumerGroup test-group)* topic指定消费的主题consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费* 2.实现RocketMQListener接口注意泛型的使用*/
Component
RocketMQMessageListener(topic test, consumerGroup test-group,messageModel MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(new Date());System.out.println(message);}
}启动两个消费者
两个消费者是一个集群且是同一个消费者组。当然两个消费者可以写在一个项目中写两个监听器就行
在生产者里面添加一个单元测试并且运行
/*** 测试消息消费的模式** throws Exception*/
Test
public void testMsgModel() throws Exception {for (int i 0; i 10; i) {rocketMQTemplate.syncSend(test, 我是消息 i);}
}查看两个消费者的控制台
发现是负载均衡的模式消息被负载均衡到两个消费者上负载均衡不是说数量一定一样 BROADCASTING 广播模式
RocketMQMessageListener(topic test,consumerGroup test-group,// 集群模式messageModel MessageModel.BROADCASTING
)重启测试广播模式下每个消费者都消费了这些消息
项目中一般部署多台机器消费者部署 2 到 3 个根据业务可以选择具体的模式来配置。广播模式是不会更新消费者位点的它在乎消费失败也不会重试就广播一次