苏州网站建设哪家便宜,网站创建价格,成都时代装饰工程有限公司,搜索引擎优化 简历RabbitMq安装
类型概念
租户 RabbitMQ 中有一个概念叫做多租户#xff0c;每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器#xff0c;这些虚拟的消息服务器就是我们所说的虚拟主机#xff08;virtual host#xff09;#xff0c;一般简称为 vhost。 每一个 vhos…RabbitMq安装
类型概念
租户 RabbitMQ 中有一个概念叫做多租户每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器这些虚拟的消息服务器就是我们所说的虚拟主机virtual host一般简称为 vhost。 每一个 vhost 都是一个独立的小型 RabbitMQ 服务器这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等并且拥有自己独立的权限不同的 vhost 中的队列和交换机不能互相绑定这样技能保证运行安全又能避免命名冲突 交换机
交换机属性意义值意义type类型direct默认的直接交换机 根据交换机下队列绑定的routingKey直接匹配fanout扇形交换机简单来说就是发布订阅队列直接绑定在交换机下统一发布消息headers头部交换机通过message header头部信息进行比对可以根据定义全匹配、部分匹配等规则topic主题交换机通过绑定routingKey进行模糊匹配Durability耐用持久化durable持久化数据存放于硬盘transient瞬态数据存放于内存Auto delete自动删除Yes没有绑定队列时自动删除针对的是曾经有过但后来没有的事物No不自动删除Internal内部使用Yes该路由绑定的队列不会被用户消费No不自动删除
队列 队列属性意义值意义type类型Default for virtual host租户配置的默认选项下列三种其一默认Classic无需设置Classic传统的队列类型数据存储在单个节点上不具备quorum队列的高可用性和数据保护特性ps:单机时使用Quorum高可用性队列数据会被复制到多个节点提供更好的数据可靠性和持久性ps:部署多节点时使用Stream特殊类型的队列用于支持事件流处理event streaming具有类似于Kafka的流式处理特性ps:听说不成熟暂时用不上Durability耐用持久化durable持久化数据存放于硬盘transient瞬态数据存放于内存
参数
显示参数实际参数作用Auto expirex-expires设置队列的过期时间单位为毫秒。当队列在指定时间内未被使用将会被自动删除Message TTLx-message-ttl设置队列中消息的过期时间Time-To-Live单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除Overflow behaviourx-overflow设置队列溢出行为可选值为 drop-head删除最旧的消息或 reject-publish拒绝发布新消息Single active consumerx-single-active-consumer配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时表示队列只允许有一个消费者活跃地消费消息其他消费者将被阻塞直到当前的消费者停止消费或断开连接Dead letter exchangex-dead-letter-exchange设置队列中的死信消息转发到的交换机名称。当消息成为死信时将会被转发到指定的交换机Dead letter routing keyx-dead-letter-routing-key设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机Max lengthx-max-length设置队列的最大长度即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后新消息将无法入队Max length bytesx-max-length-bytes设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后新消息将无法入队Leader locatorx-queue-leader-locator配置队列的领导者Leader定位器集群中使用
SpringBoot整合
引入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.6.3/version
/dependency配置数据源
spring: rabbitmq:addresses: xxx.xxx.xx.xx:5672username: adminpassword: xxxxxxvirtual-host: /配置交换机和队列
Component
public class RabbitMqConfig {// 定义交换机名称public static final String FANOUT_EXCHANGE fanout.test;Bean(name FANOUT_EXCHANGE)public FanoutExchange fanoutExchange() {// 交换机类型按需创建这里用的是Fanout发布订阅绑定在该交换机下的队列都会收到消息// 参数2是否持久化// 参数3是否自动删除return new FanoutExchange(FANOUT_EXCHANGE, true, false);}// 定义队列public static final String FANOUT_QUEUE1 queue1;Bean(name FANOUT_QUEUE1)public Queue fanoutQueue1() {// 后三个不写也行这是默认值// 参数2是否持久化数据到磁盘防止意外关闭数据丢失// 参数3是否具有排他性// 参数4队列不再使用时是否自动删除return new Queue(FANOUT_QUEUE1, true, false, false);}public static final String FANOUT_QUEUE2 queue2;Bean(name FANOUT_QUEUE2)public Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE2, true, false, false);}Beanpublic Binding bindingSimpleQueue1(Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}Beanpublic Binding bindingSimpleQueue2(Qualifier(FANOUT_QUEUE2) Queue fanoutQueue2,Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}测试发一条消息到队列
SpringBootTest(classes TemplateApplication.class)
public class RabbitMQTest {AutowiredRabbitMessagingTemplate rabbitMessagingTemplate;Testpublic void testSent(){//指定交换机-指定队列因为创建的交换机是FanoutExchange所以绑定该交换机的队列都会收到一条消息rabbitMessagingTemplate.convertAndSend(fanout.test,发送数据到FanoutExchange);// 如果创建队列不绑定交换机和路由键那么实际上会有默认的交换机和路由键均为空直接将消息发送给队列队列名则和路由键保持一致仍然可以成功发送消息。}
}测试接收队列消息
写个监听类接收消息
Component
public class RabbitMqListenter {RabbitListener(queues {RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})public void reciveLogAll(String msg) throws Exception {System.out.println(消费到数据 msg);}
}-------------基础的使用到这里就结束了-------------
拓展事项
rabbitMqPusher
自己封装一个更加方便使用的发送工具可有可无其中可以使用RabbitMessagingTemplate和RabbitTemplateRabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用那么RabbitMessagingTemplate就够用了如果需要更精细的控制可以选择使用RabbitTemplate。
但它们在使用方式和功能上有一些不同点
RabbitMessagingTemplate
RabbitMessagingTemplate是MessagingTemplate的子类用于在Spring应用程序中发送和接收消息。 它提供了一种更高级别的抽象使得在Spring框架中更容易使用消息发送和接收的功能。 可以直接与Spring的消息通道MessageChannel集成方便进行消息的发送和接收。
RabbitTemplate
RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类提供了丰富的方法来发送和接收消息。 它是一个强大而灵活的工具可以直接与RabbitMQ的交互进行细粒度的控制。 可以设置消息的属性、监听发送确认、接收确认等功能更加灵活地处理消息发送和接收的细节。
public interface RabbitMqPublish {void send(String quene, String message);void send(String exchange, String routingKey, String message);void send(String quene, String message, Integer expiration);void send(String exchange, String routingKey, String message, Integer expiration);}package com.template.rabbitmq.producer.impl;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;Component
Slf4j
public class RabbitMqPublishImpl implements RabbitMqPublish {Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* param quene 队列名称 或 交换机名称* param message 消息内容*/public void send(String quene, String message) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).build());log.info(发送消息--- quene:{} --- message:{}, message, quene);}/*** 直接发送消息到队列* 超过有效期丢弃** param quene 队列名称* param message 消息内容* param expiration 有效期(毫秒)*/public void send(String quene, String message, Integer expiration) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info(发送消息--- quene:{} --- message:{} --- expiration:{}, quene, message, expiration);}/*** 发送消息* 超过有效期丢弃** param exchange 交换机名称* param routingKey 路由键* param message 消息内容* param expiration 有效期(毫秒)*/public void send(String exchange, String routingKey, String message, Integer expiration) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info(发送消息--- exchange:{} --- routingKey:{} --- message:{} --- expiration:{}, exchange, routingKey, message, expiration);}/*** 发送消息** param exchange 交换机名称* param routingKey 路由键* param message 消息内容*/public void send(String exchange, String routingKey, String message) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).build());log.info(发送消息--- exchange:{} --- routingKey:{} --- message:{}, exchange, routingKey, message);}} 在RabbitMQ中如果队列没有设置过期时间即没有声明x-message-ttl属性那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。 实测以上列代码的方式直接对消息设置有效期是生效的。
死信队列
和普通队列一样只不过是对其他队列进行配置将过期的消息路由到死信队列中。 创建死信交换机和死信路由 // 配置交换机的文件中继续增加配置public static final String DIRECT_GP_DEAD_LETTER_EXCHANGE DIRECT_GP_DEAD_LETTER_EXCHANGE;public static final String DIRECT_GP_DEAD_LETTER_QUEUE DIRECT_GP_DEAD_LETTER_QUEUE;Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)public DirectExchangedirectDeadLetterExchange() {return new DirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE, true, false, new HashMap());}Bean(DIRECT_GP_DEAD_LETTER_QUEUE)public Queue directDeadLetterQueue() {return new Queue(DIRECT_GP_DEAD_LETTER_QUEUE, true, false, false, new HashMap());}设置队列消息有效期并绑定死信队列 Bean(name DIRECT_QUEUE1)public Queue directQueue1() {HashMapString, Object headers new HashMap();// 配置消息有效期消息发送到队列10秒后如果未被消费者消费则过期headers.put(x-message-ttl,10000);// 配置超期交换机消息过期后会发送到此交换机headers.put(x-dead-letter-exchange,DIRECT_GP_DEAD_LETTER_EXCHANGE);// 配置超期routingKey消息过期后转移消息时指定的routingKeyheaders.put(x-dead-letter-routing-key,DIRECT_GP_DEAD_LETTER_QUEUE);// 如果只配置了有效期未配置交换机和routingKey则消息会被直接丢弃return new Queue(DIRECT_QUEUE1, true, false, false,headers);}配置完成后尝试向DIRECT_QUEUE1发送一条消息不启动消费者10秒后消息会自动转移到死信队列中可在可视化管理界面进行验证。
延时队列
延时队列场景举例
预定一个会议室两个小时后开始要求提前十分钟通知参会人员进行开会。 如果不使用延时队列那么就需要不断轮询查看是否到达需要通知的时间进行消息通知。
延时队列的实现方式
死信队列消息有效期 预定时间到提前十分钟通知中间有110分钟那么创建一条通知消息设置有效期110分钟丢入队列不用消费者去监听等待消息过期后路由到指定的死信队列再去消费死信队列中的消息即可。 所以延时队列实际上是一种实现方案而不是一种特定的队列类型。