怎么做简单网站,国外云服务器厂商,江苏盐城网站建设,腾讯地图如何标注自己店铺位置目录
一、引入依赖
二、配置rabbitmq的连接信息等
1、生产者配置
2、消费者配置
三、设置消息转换器
四、生产者代码示例 1、配置交换机和队列信息
2、生产消息代码
五、消费者代码示例
1、消费层代码
2、业务层代码 在分布式系统中#xff0c;消息队列是一种重要…
目录
一、引入依赖
二、配置rabbitmq的连接信息等
1、生产者配置
2、消费者配置
三、设置消息转换器
四、生产者代码示例 1、配置交换机和队列信息
2、生产消息代码
五、消费者代码示例
1、消费层代码
2、业务层代码 在分布式系统中消息队列是一种重要的通信方式它能够有效地将消息从一个应用程序传递到另一个应用程序。RabbitMQ是一款流行的开源消息队列系统简单易用且功能强大。本文将介绍如何使用SpringBoot快速整合RabbitMQ实现消息的发送和接收。 交换机 主要负责接收生产者发送的消息并根据特定的规则将这些消息路由到一个或多个队列中。交换机的类型有 Fanout Exchange扇出交换机 Fanout交换机会将接收到的所有消息广播到它知道的所有队列中。这种类型的交换机不考虑路由键只是简单地将消息复制到所有绑定的队列中。适用于不需要选择性地发送消息给特定队列的情况例如广播系统通知或有多个服务需要消费同一份数据的场景。
Direct Exchange直连交换机 Direct交换机根据消息的路由键将消息发送到与之匹配的队列中。只有当路由键与绑定关键字完全匹配时消息才会被路由到相应的队列。适合于精确控制消息投递的场景如特定的服务或功能模块只关心特定类型的消息。
Topic Exchange主题交换机 Topic交换机允许更复杂的匹配规则通过模式匹配的方式将消息路由到一个或多个队列。路由键和绑定键都使用点分隔的字符串可以包含特殊字符如“#”和“*”来实现模糊匹配。*用于匹配一个单词而“#”则用于匹配零个或多个单词。适合于需要按内容分类消息的系统如日志处理系统可以根据日志等级或来源将日志消息分发到不同的队列。
Headers Exchange头交换机 Headers交换机使用消息头的一组键值对来决定消息应该被路由到哪个队列。这种交换机允许更细粒度的路由控制但配置和使用较为复杂。适合需要基于消息多个属性来动态决定路由的场景例如某些高级的路由策略或复杂的事件处理系统。
队列主要用于存储消息实现先进先出FIFO的特性。
一、引入依赖
这里引入了两个依赖。一个是rabbitmq的依赖另一个是配置json转换器所需要的依赖。生产者和消费者服务都需要引入这两个依赖。 dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency dependency groupIdcom.fasterxml.jackson.dataformat/groupId artifactIdjackson-dataformat-xml/artifactId /dependency 二、配置rabbitmq的连接信息等
1、生产者配置 rabbitmq: host: 170.40.20.16 port: 5672 username: zhuoye password: zy521 virtual-host: / 2、消费者配置 rabbitmq: host: 170.40.20.16 port: 5672 username: zhuoye password: zy521 virtual-host: / listener: simple: prefetch: 1 #每次只能处理一个处理完成才能获取下一个消息 三、设置消息转换器 默认情况下Spring采用的序列化方式是JDK序列化而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以这里我们一般使用Jackson的序列化代替JDk的序列化。
在生产者和消费者的启动类上加上如下代码
SpringBootApplication
EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {public static void main( String[] args ) {SpringApplication.run(ConsumerApp.class, args);}//使用的是Jackson库中的Jackson2JsonMessageConverter类代替使用jdk自带的序列化Beanpublic MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能return jackson2JsonMessageConverter;}
}
四、生产者代码示例 1、配置交换机和队列信息
Configuration
public class RabbitMqConfig {private static String EXCHANGE_NAMEamq.topic;private static String QUEUE_NAMEalarm.data.topic.queue;private static String CONFIRM_ALARM_QUEUE_NAMEalarm.confirm.data.topic.queue;/*** 声明交换机*/Beanpublic TopicExchange exchange(){// durable:是否持久化,默认是false// autoDelete:是否自动删除当没有生产者或者消费者使用此交换机该交换机会自动删除。return new TopicExchange(EXCHANGE_NAME,true,false);}/*** 声明告警队列* return*/Bean(alarmQueue)public Queue alarmQueue(){// durable:是否持久化,默认是false,持久化队列会被存储在磁盘上当消息代理重启时仍然存在// exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除。return new Queue(QUEUE_NAME,true,false,false);}/*** 声明确认告警队列* return*/Bean(confirmAlarmQueue)public Queue confirmAlarmQueue(){return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);}/*** 声明告警队列绑定关系* param queue* param topicExchange* return*/Beanpublic Binding alarmBinding(Qualifier(alarmQueue) Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(server.event.#);}/*** 声明确认告警队列绑定关系* param queue* param topicExchange* return*/Beanpublic Binding confirmAlarmBinding(Qualifier(confirmAlarmQueue) Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(server.event_confirm.#);}
2、生产消息代码 Autowiredprivate RabbitTemplate rabbitTemplate;private static String EXCHANGE_NAMEamq.topic;private static String CONFIRM_ALARM_QUEUE_NAMEalarm.confirm.data.topic.queue;Testvoid producerAlarmMsg() {String msg 发送一条告警消息;rabbitTemplate.convertAndSend(EXCHANGE_NAME, server.event.#,msg);System.out.println(msg msg);}Testvoid producerConfirmAlarmMsg() {String msg 发送一条确认告警消息;rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, server.event_confirm.#,msg);System.out.println(msg msg);}
五、消费者代码示例
1、消费层代码
Component
public class AlarmConsumer {Autowiredprivate IAlarmService alarmService;RabbitListener(queues alarm.data.topic.queue,concurrency 5)public void getAlarmInfo(String data){alarmService.dealAlarmData(data);}RabbitListener(queues alarm.confirm.data.topic.queue,concurrency 5)public void getConfirmAlarmInfo(String data){alarmService.dealConfirmAlarmData(data);}
}
2、业务层代码
Service
public class IAlarmServiceImpl implements IAlarmService {Overridepublic void dealAlarmData(String data) {EquipAlarmResp equipAlarmResp JSON.parseObject(result,EquipAlarmResp.class);ListString alarmIdsOld dceEquipAlarmMapper.queryAllAlarmIds();DceEquipAlarmDto dceEquipAlarmDto CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);dceEquipAlarmDto.setCreateTime(new Date());dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);//查询出需要新增或者更新的数据Boolean flagalarmIdsOld.stream().filter(a-a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();//开启事务保证新增、更新、删除的原子性TransactionStatus transaction transactionManager.getTransaction(transactionDefinition);ListDceEquipAlarmDto listnew ArrayList();list.add(dceEquipAlarmDto);try {//新增if (!flag) {dceEquipAlarmMapper.insertBatch(list);}//更新if (flag) {dceEquipAlarmMapper.updateBatch(list);}//提交事务transactionManager.commit(transaction);} catch (Exception e) {//回滚transactionManager.rollback(transaction);log.error(DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败, e);}}Overridepublic void dealConfirmAlarmData(String data) {EquipConfirmAlarmResp alarmResp JSON.parseObject(data,EquipConfirmAlarmResp.class);Integer confirmTime Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));alarmResp.setConfirmTime(confirmTime);dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());}}注以上代码为对接告警信息和对接告警确认消息的示例。