建设银行网站怎么短信转账,江苏建设集团公司官网,凡科网站为什么免费做网站,销售管理软件新技术目录一、Spring 整合 RocketMQ1.1 消息生产者1.2 消息消费者1.3 Spring 配置文件1.4 运行实例程序二、参考链接一、Spring 整合 RocketMQ
不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件#xff0c;Spring 社区已经通过多种方式提供了对这些中间件产品集成#xff0c;例如通…
目录一、Spring 整合 RocketMQ1.1 消息生产者1.2 消息消费者1.3 Spring 配置文件1.4 运行实例程序二、参考链接一、Spring 整合 RocketMQ
不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件Spring 社区已经通过多种方式提供了对这些中间件产品集成例如通过 spring-jms 整合 ActiveMQ、通过 Spring AMQP 项目下的 spring-rabbit 整合 RabbitMQ、通过 spring-kafka 整合 kafka 通过他们可以在 Spring 项目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三种方式一是将消息生产者和消费者定义成 bean 对象交由 Spring 容器管理二是使用 RocketMQ 社区的外部项目 rocketmq-jmshttps://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms然后通过 spring-jms 方式集成使用三是如果你的应用是基于 spring-boot 的可以使用 RocketMQ 的外部项目 rocketmq-spring-boot-starterhttps://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter比较方便的收发消息。 总的来讲 rocketmq-jms 项目实现了 JMS 1.1 规范的部分内容目前支持 JMS 中的发布/订阅模型收发消息。rocketmq-spring-boot-starter 项目目前已经支持同步发送、异步发送、单向发送、顺序消费、并行消费、集群消费、广播消费等特性如果比较喜欢 Spring Boot 这种全家桶的快速开发框架并且现有特性已满足业务要求可以使用该项目。当然从 API 使用上最灵活的还是第一种方式下面以第一种方式为例简单看下Spring 如何集成 RocketMQ 的。
1.1 消息生产者
package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SpringProducer {private Logger logger Logger.getLogger(getClass());private String producerGroupName;private String nameServerAddr;private DefaultMQProducer producer;public SpringProducer(String producerGroupName, String nameServerAddr) {this.producerGroupName producerGroupName;this.nameServerAddr nameServerAddr;}public void init() throws Exception {logger.info(开始启动消息生产者服务...);//创建一个消息生产者并设置一个消息生产者组producer new DefaultMQProducer(producerGroupName);//指定 NameServer 地址producer.setNamesrvAddr(nameServerAddr);//初始化 SpringProducer整个应用生命周期内只需要初始化一次producer.start();logger.info(消息生产者服务启动成功.);}public void destroy() {logger.info(开始关闭消息生产者服务...);producer.shutdown();logger.info(消息生产者服务已关闭.);}public DefaultMQProducer getProducer() {return producer;}
}消息生产者就是把生产者 DefaultMQProducer 对象的生命周期分成构造函数、init、destroy 三个方法构造函数中将生产者组名、NameServer 地址作为变量由 Spring 容器在配置时提供init 方法中实例化 DefaultMQProducer 对象、设置 NameServer 地址、初始化生产者对象destroy 方法用于生产者对象销毁时清理资源。
1.2 消息消费者
package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class SpringConsumer {private Logger logger Logger.getLogger(getClass());private String consumerGroupName;private String nameServerAddr;private String topicName;private DefaultMQPushConsumer consumer;private MessageListenerConcurrently messageListener;public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {this.consumerGroupName consumerGroupName;this.nameServerAddr nameServerAddr;this.topicName topicName;this.messageListener messageListener;}public void init() throws Exception {logger.info(开始启动消息消费者服务...);//创建一个消息消费者并设置一个消息消费者组consumer new DefaultMQPushConsumer(consumerGroupName);//指定 NameServer 地址consumer.setNamesrvAddr(nameServerAddr);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe(topicName, *);//注册消息监听器consumer.registerMessageListener(messageListener);// 消费者对象在使用之前必须要调用 start 初始化consumer.start();logger.info(消息消费者服务启动成功.);}public void destroy(){logger.info(开始关闭消息消费者服务...);consumer.shutdown();logger.info(消息消费者服务已关闭.);}public DefaultMQPushConsumer getConsumer() {return consumer;}}同消息生产者类似消息消费者是把生产者 DefaultMQPushConsumer 对象的生命周期分成构造函数、init、destroy 三个方法具体含义在介绍 Java 访问 RocketMQ 实例时已经介绍过了不再赘述。当然有了消费者对象还需要消息监听器在接收到消息后执行具体的处理逻辑。
package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class MessageListener implements MessageListenerConcurrently {private Logger logger Logger.getLogger(getClass());public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (list ! null) {for (MessageExt ext : list) {try {logger.info(监听到消息 : new String(ext.getBody(), UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}消息监听器类就是把前面 Java 示例中注册消息监听器时声明的匿名内部类代码抽取出来定义成单独一个类而已。
1.3 Spring 配置文件
因为只使用 Spring 框架集成所以除了 Sping 框架核心 jar 包外不需要额外添加依赖包了。本例中将消息生产者和消息消费者分成两个配置文件这样能更好的演示收发消息的效果。
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsdbean idproducer classorg.study.mq.rocketMQ.spring.SpringProducer init-methodinit destroy-methoddestroyconstructor-arg namenameServerAddr valuelocalhost:9876/constructor-arg nameproducerGroupName valuespring_producer_group//bean
/beans消息生产者配置很简单定义了一个消息生产者对象该对象初始化时调用 init 方法对象销毁前执行 destroy 方法将 Name Server 地址和生产者组配置好。
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsdbean idmessageListener classorg.study.mq.rocketMQ.spring.MessageListener /bean idconsumer classorg.study.mq.rocketMQ.spring.SpringConsumer init-methodinit destroy-methoddestroyconstructor-arg namenameServerAddr valuelocalhost:9876/constructor-arg nameconsumerGroupName valuespring_consumer_group/constructor-arg nametopicName valuespring-rocketMQ-topic /constructor-arg namemessageListener refmessageListener //bean/beans消息消费者同消息生产者配置类似多了一个消息监听器对象的定义和绑定。
1.4 运行实例程序
按前述步骤 启动 Name Server 和 Broker接着运行消息生产者和消息消费者程序简化起见我们用两个单元测试类模拟这两个程序
package org.study.mq.rocketMQ.spring;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringProducerTest {private ApplicationContext container;Beforepublic void setup() {container new ClassPathXmlApplicationContext(classpath:spring-producer.xml);}Testpublic void sendMessage() throws Exception {SpringProducer producer container.getBean(SpringProducer.class);for (int i 0; i 20; i) {//创建一条消息对象指定其主题、标签和消息内容Message msg new Message(spring-rocketMQ-topic,null,(Spring RocketMQ demo i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */);//发送消息并返回结果SendResult sendResult producer.getProducer().send(msg);System.out.printf(%s%n, sendResult);}}
}SpringProducerTest 类模拟消息生产者发送消息。
package org.study.mq.rocketMQ.spring;import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringConsumerTest {private ApplicationContext container;Beforepublic void setup() {container new ClassPathXmlApplicationContext(classpath:spring-consumer.xml);}Testpublic void consume() throws Exception {SpringConsumer consumer container.getBean(SpringConsumer.class);Thread.sleep(200 * 1000);consumer.destroy();}
}SpringConsumerTest 类模拟消息消费者者接收消息在 consume 方法返回之前需要让当前线程睡眠一段时间使消费者程序继续存活才能监听到生产者发送的消息。
分别运行 SpringProducerTest 类 和 SpringConsumerTest 类在 SpringConsumerTest 的控制台能看到接收的消息 假如启动两个 SpringConsumerTest 类进程因为它们属于同一消费者组在 SpringConsumerTest 的控制台能看到它们均摊到了消息
二、参考链接
[01] 消息队列之 RocketMQ