登陆网站取消备案,永久网站域名注册,廊坊住房和城乡建设厅网站,手机app与手机网站的区别RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器#xff0c;基于 AMQP#xff08;高级消息队列协议#xff09;标准#xff0c;使用 Erlang 编程语言构建。它是消息队列#xff08;MQ#xff09;的一种#xff0c;广泛应用于分布式系统中#x… RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器基于 AMQP高级消息队列协议标准使用 Erlang 编程语言构建。它是消息队列MQ的一种广泛应用于分布式系统中用于实现应用程序之间的异步消息传递。RabbitMQ 具有高可靠性、易扩展、高可用和功能丰富的特点支持多种编程语言客户端如 Java、Python、Ruby、C# 等。 RabbitMQ 的核心概念 Producer生产者消息的生产者负责将消息发送到 RabbitMQ 中的 Exchange。Consumer消费者消息的消费者负责从队列中获取并处理消息。Connection生产者/消费者和 Broker 之间的 TCP 连接。Channel在 Connection 内部建立的逻辑连接用于减少操作系统建立 TCP 连接的开销。Broker接收和分发消息的应用RabbitMQ Server 就是 Message Broker。Virtual Host出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中。Exchange消息到达 Broker 的第一站根据分发规则匹配查询表中的 routing key分发消息到队列中去。常用的类型有 direct、topic 和 fanout。Queue消息最终被送到这里等待消费者取走。 RabbitMq的交换机类型
1. Direct Exchange
描述Direct 交换机是最简单的交换机类型。它根据消息的 routing key 将消息路由到一个特定的队列。如果队列的 binding key 与消息的 routing key 完全匹配则消息会被路由到该队列。特点 一对一匹配消息的 routing key 必须与队列的 binding key 完全相同。简单直接适用于一对一的消息传递场景。示例 生产者发送消息时指定 routing key 为 info。队列 A 绑定到交换机时binding key 也为 info。消息将被路由到队列 A。
2. Topic Exchange
描述Topic 交换机允许更复杂的路由模式。消息的 routing key 和队列的 binding key 可以包含通配符从而实现更灵活的路由规则。特点 模式匹配支持通配符 *匹配一个单词和 #匹配多个单词。灵活多变适用于多对多的消息传递场景可以实现复杂的路由逻辑。示例 生产者发送消息时指定 routing key 为 user.info。队列 A 绑定到交换机时binding key 为 user.*。队列 B 绑定到交换机时binding key 为 *.info。消息将被路由到队列 A 和队列 B。
3. Fanout Exchange
描述Fanout 交换机是最简单的广播交换机。它不关心消息的 routing key将消息广播到所有绑定到该交换机的队列。特点 广播消息消息会被发送到所有绑定的队列无论队列的 binding key 是什么。简单高效适用于需要将消息广播到多个消费者的情况。示例 生产者发送消息时不指定 routing key。队列 A、队列 B 和队列 C 都绑定到该交换机。消息将被路由到队列 A、队列 B 和队列 C。
RabbitMQ 的主要特点 可靠性使用消息确认机制确保消息的可靠传递。生产者在发送消息后会收到一个确认消费者在处理完消息后会发送一个确认。如果消息发送或处理失败RabbitMQ 会重新发送消息直到确认为止。灵活性支持多种消息传递模式包括点对点、发布/订阅和消息路由等。可扩展性可以通过添加更多的节点来实现水平扩展以处理更大的消息负载。它还支持集群和镜像队列提供高可用性和负载均衡。多语言支持提供了多种编程语言的客户端库包括 Java、Python、Ruby、C# 等。 MQ选型对比
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava可用性高基于主从架构实现高可用高基于主从架构实现高可用非常高分布式架构非常高分布式一个数据多个副本少数机器宕机不会丢失数据不会导致不可用单机吞吐量万级万级十万级十万级以上消息延迟微秒级毫秒级毫秒级毫秒级以内消息可靠性较高基本不丢较低有丢大概率经过参数优化配置可以做到 0 丢失经过参数优化配置可以做到 0 丢失
RabbitMQ安装 环境Centos7.9基于docker安装 1.使用docker run命令创建容器并安装mq
docker run \-e RABBITMQ_DEFAULT_USERmqadmin \-e RABBITMQ_DEFAULT_PASSmqadmin \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network mq-net\-d \rabbitmq:3.8-management
2.开放端口或关闭防火墙如果访问不了
方法1开放端口
#1.开放mq端口
firewall-cmd --zonepublic --add-port15672/tcp --add-port5672/tcp --permanent
#2.重新加载防火墙配置
firewall-cmd --reload方法2临时关闭防火墙
systemctl stop firewalld
3.访问RabbitMQ控制台并登录
账号密码就是创建mq容器时指定的RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS 访问IP地址主机ip:15672 RabbitMQ控制台使用
1.收发消息
1.1创建消息队列 1.2创建一个交换机 1.3讲交换机与队列绑定 1.4发送消息 1.5查看消息 2.数据隔离
当我们只部署了一个mq的话当多个不同项目同时使用。这个时候为了避免互相干扰 我们会利用virtual host的隔离特性将不同项目隔离。
实现步骤
1.在我们的用户管理创建一个新用户 可以看到我们的用户创建成功但是刚创建的用户是没有虚拟主机的
2.登录新创建的用户配置虚拟主机 我们可以通过右上角选择自己的虚拟主机 可以看到在我们选择我们当前用户的虚拟机主机之后就看不到我们之前用/创建的队列了
SpringAMQP使用
将来我们开发业务功能的时候肯定不会在控制台收发消息而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
但是RabbitMQ官方提供的Java客户端编码相对复杂一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具SpringAMQP。并且还基于SpringBoot对其实现了自动装配使用起来非常方便。
Spring AMQPSpringAmqp的官方地址Spring AMQP
SpringAMQP提供了三个功能 自动声明队列、交换机及其绑定关系 基于注解的监听器模式异步接收消息 封装了RabbitTemplate工具用于发送消息
1.创建一个Maven的mqDemo项目
2.创建两个子模块publisher(消息的发送者)、consumer(消息的消费者) 3.在父模块的pom.xml中导入以下配置 groupIdcn.mq.demo/groupIdartifactIdmq-demo/artifactIdversion1.0-SNAPSHOT/versionmodulesmodulepublisher/modulemoduleconsumer/module/modulespackagingpom/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.12/versionrelativePath//parentpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/dependency/dependencies
4.在子模块pom.xml中分别导入以下配置
publisher parentartifactIdmq-demo/artifactIdgroupIdcn.mq.demo/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdpublisher/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/properties
consumer parentartifactIdmq-demo/artifactIdgroupIdcn.mq.demo/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdconsumer/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/properties
5.在两个子模块的application.yaml中加入以下配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.181.32 # 你的虚拟机IPport: 5672 # 端口virtual-host: /test # 虚拟主机username: testuser # 用户名password: testuser # 密码
6.创建交换机、队列并监听消息
方式1基于配置类创建交换机、队列并绑定
在consumer下创建一个configuration类
Configuration
public class FanoutConfiguration {Beanpublic FanoutExchange fanoutExchange() {//创建交换机return new FanoutExchange(test.fanout);}Beanpublic Queue fanoutQueue1() {//创建队列return new Queue(fanout.queue1);}Beanpublic Queue fanoutQueue2() {//创建队列return new Queue(fanout.queue2);}Beanpublic Binding binDingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {//将队列与交换机进行绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}Beanpublic Binding binDingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {//将队列与交换机进行绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
然后创建一个监听类监听消息
Slf4j
Component
public class SpringRabbitListener {RabbitListener(queues fanout.queue1)//监听的队列public void listenerFanoutQueue1(String message) throws InterruptedException {System.out.println(消费者1接收到test.fanout消息 message LocalTime.now());}RabbitListener(queues fanout.queue2)//监听的队列public void listenerFanoutQueue2(String message) throws InterruptedException {System.out.println(消费者2接收到test.fanout消息 message LocalTime.now());}
}方式2基于注解创建交换机、队列并绑定
Slf4j
Component
public class SpringRabbitListener {
RabbitListener(bindings QueueBinding(value Queue(name fanout.quque1),exchange Exchange(name test.fanout, type ExchangeTypes.FANOUT)))public void listenerFanoutQueue1(String message) throws InterruptedException {System.err.println(消费者1接收到test.fanout消息 message LocalTime.now());}RabbitListener(bindings QueueBinding(value Queue(value fanout.queue1),exchange Exchange(name test.fanout, type ExchangeTypes.FANOUT)))public void listenerFanoutQueue2(String message) throws InterruptedException {System.err.println(消费者2接收到test.fanout消息 message LocalTime.now());}
}启动ConsumerApplication类查看rabbitmq控制台查看是否已经创建交换机和队列成功并且正确绑定上面的方式实现一种即可 6.发送消息
在publisher创建测试类发送消息
Slf4j
SpringBootTest
class SpringAmqpTest {AutowiredRabbitTemplate rabbitTemplate;Testpublic void testFanoutQueue() {String exchangeName test.fanout;//交换机名称String message hello,everyone!;//发送的消息rabbitTemplate.convertAndSend(exchangeName, , message);}
}
8.测试
1.运行ConsumerApplication启动类保持运行状态
2.运行testFanoutQueue测试类的方法
3.查看控制台输出 正确接收到消息