网站建设与规划实验心得,广告设计哪里可以学,大学生水果预定配送网站建设的项目规划书,网络平台怎么搭建网站关于RabbitMQ你了解多少#xff1f; 文章目录 关于RabbitMQ你了解多少#xff1f;基础篇同步和异步MQ技术选型介绍和安装数据隔离SpringAMQP快速入门Work queues交换机Fanout交换机Direct交换机Topic交换机 声明队列和交换机MQ消息转换器 高级篇消息可靠性问题发送者的可靠性…关于RabbitMQ你了解多少 文章目录 关于RabbitMQ你了解多少基础篇同步和异步MQ技术选型介绍和安装数据隔离SpringAMQP快速入门Work queues交换机Fanout交换机Direct交换机Topic交换机 声明队列和交换机MQ消息转换器 高级篇消息可靠性问题发送者的可靠性生产者重连生产者确认SpringAMQP实现生产者确认 RabbitMQ是目前企业中应用非常广泛的高性能的异步通讯组件 基础篇
同步和异步
同步调用
优势时效性强等待到结果后才返回不足拓展性差、性能下降、级联失败问题
异步调用
异步调用方式其实就是基于消息通知的方式一般包含三个角色 消息发送者投递消息的人就是原来的调用方消息代理管理、暂存、转发消息消息接收者接收和处理消息的人就是原来的服务提供方 优势解除耦合拓展性强、无需等待性能好、故障隔离、缓存消息流量削峰填谷不足不能立刻得到调用结果时效性差、不确定下游业务执行是否成功、业务安全依赖于Broker的可靠性
MQ技术选型
MQ(MessageQueue)中文是消息队列字面来看就是存放消息的队列。也就是异步调用中的Broker。
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
介绍和安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件官网地址
同样基于Docker来安装RabbitMQ使用下面的命令即可
docker run \-e RABBITMQ_DEFAULT_USERadmin \-e RABBITMQ_DEFAULT_PASSadmin123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management15672RabbitMQ提供的管理控制台的端口 5672RabbitMQ的消息发送处理接口 安装完成后访问管理控制台。首次访问需要登录默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面
RabbitMQ对应的整体架构核心概念
publisher生产者也就是发送消息的一方consumer消费者也就是消费消息的一方queue队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理exchange交换机负责消息路由、转发消息。生产者发送的消息由交换机决定投递到哪个队列无存储能力virtual host虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的exchange、queue 数据隔离
每个Virtual Host提供了一个独立的消息代理它们之间完全隔离相互之间不共享任何资源。通过将不同的应用程序或服务分配到不同的Virtual Host可以实现数据的隔离。每个Virtual Host都有自己独立的交换机、队列和绑定规则。这样消息只能在同一个Virtual Host内进行路由和传递不会跨越Virtual Host。通过使用Virtual Hosts可以将不同的应用程序或服务的消息进行逻辑隔离确保它们之间不会互相干扰或访问彼此的数据。这在多租户环境下尤为有用可以确保不同的租户之间的消息数据完全隔离提高安全性和隐私性。
SpringAMQP
SpringAMQP的官方地址
AMQPAdvanced Message Queuing Protocol是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关更符合微服务中独立性的要求。Spring AMQPSpring AMQP是基于AMQP协议定义的一套API规范提供了模板来发送和接收消息。包含两部分其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。
快速入门 引入spring-amqp依赖这样publisher和consumer服务都可以使用 !——AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency配置RabbitMQ服务端信息 spring:rabbitmq:host: 192.168.100.101 #主机名port: 5672 # 端口virtual-host: /liner #虚拟主机username: admin #用户名password: admin123 #密码发送消息SpringAMQP提供了RabbitTemplate工具类方便我们发送消息。 Autowired
private RabbitTemplate rabbitTemplate;
Test
public void testSimpleQueue() {//队列名称String queueName simple.queue;//消息String message hello,spring amqp!;//发送消息rabbitTemplate.convertAndSend(queueName,message);
}接收消息SpringAMQP提供声明式的消息监听只需通过注解在方法上声明要监听的队列名称将来SpringAMQP就会把消息传递给当前方法 slf4j
Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info(spring消费者接收到消息:【 msg 】);if (true) {throw new MessageconversionException(故意的);}log.info(消息处理完成);}
}Work queues
Work queues任务模型。让多个消费者绑定到一个队列共同消费队列中的消息。 #mermaid-svg-nOQA822JtEftp628 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-nOQA822JtEftp628 .error-icon{fill:#552222;}#mermaid-svg-nOQA822JtEftp628 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-nOQA822JtEftp628 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-nOQA822JtEftp628 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-nOQA822JtEftp628 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-nOQA822JtEftp628 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-nOQA822JtEftp628 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-nOQA822JtEftp628 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-nOQA822JtEftp628 .marker.cross{stroke:#333333;}#mermaid-svg-nOQA822JtEftp628 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-nOQA822JtEftp628 .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-nOQA822JtEftp628 .cluster-label text{fill:#333;}#mermaid-svg-nOQA822JtEftp628 .cluster-label span{color:#333;}#mermaid-svg-nOQA822JtEftp628 .label text,#mermaid-svg-nOQA822JtEftp628 span{fill:#333;color:#333;}#mermaid-svg-nOQA822JtEftp628 .node rect,#mermaid-svg-nOQA822JtEftp628 .node circle,#mermaid-svg-nOQA822JtEftp628 .node ellipse,#mermaid-svg-nOQA822JtEftp628 .node polygon,#mermaid-svg-nOQA822JtEftp628 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-nOQA822JtEftp628 .node .label{text-align:center;}#mermaid-svg-nOQA822JtEftp628 .node.clickable{cursor:pointer;}#mermaid-svg-nOQA822JtEftp628 .arrowheadPath{fill:#333333;}#mermaid-svg-nOQA822JtEftp628 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-nOQA822JtEftp628 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-nOQA822JtEftp628 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-nOQA822JtEftp628 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-nOQA822JtEftp628 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-nOQA822JtEftp628 .cluster text{fill:#333;}#mermaid-svg-nOQA822JtEftp628 .cluster span{color:#333;}#mermaid-svg-nOQA822JtEftp628 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-nOQA822JtEftp628 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} publisher queue consumer1 consumer2 消费者消息推送限制
默认情况下RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。这并没有考虑到消费者是否已经处理完消息可能出现消息堆积。 因此需要修改application.yml设置preFetch值为1确保同一时刻最多投递给消费者1条消息
spring:rabbitmq:listener:simple:prefetch: 1 #每次只能获取一条消息处理完成才能获取下一个消息work模型的使用
多个消费者绑定到一个队列可以加快消息处理速度同一条消息只会被一个消费者处理通过设置prefetch来控制消费者预取的消息数量处理完一条再处理下一条实现能者多劳
交换机
真正生产环境都会经过exchange来发送消息而不是直接发送到队列交换机的类型有以下三种
Fanout广播Direct定向Topic话题 #mermaid-svg-fQm8dIvuo1FgdCJK {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-fQm8dIvuo1FgdCJK .error-icon{fill:#552222;}#mermaid-svg-fQm8dIvuo1FgdCJK .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-fQm8dIvuo1FgdCJK .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-fQm8dIvuo1FgdCJK .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-fQm8dIvuo1FgdCJK .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-fQm8dIvuo1FgdCJK .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-fQm8dIvuo1FgdCJK .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-fQm8dIvuo1FgdCJK .marker{fill:#333333;stroke:#333333;}#mermaid-svg-fQm8dIvuo1FgdCJK .marker.cross{stroke:#333333;}#mermaid-svg-fQm8dIvuo1FgdCJK svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-fQm8dIvuo1FgdCJK .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-fQm8dIvuo1FgdCJK .cluster-label text{fill:#333;}#mermaid-svg-fQm8dIvuo1FgdCJK .cluster-label span{color:#333;}#mermaid-svg-fQm8dIvuo1FgdCJK .label text,#mermaid-svg-fQm8dIvuo1FgdCJK span{fill:#333;color:#333;}#mermaid-svg-fQm8dIvuo1FgdCJK .node rect,#mermaid-svg-fQm8dIvuo1FgdCJK .node circle,#mermaid-svg-fQm8dIvuo1FgdCJK .node ellipse,#mermaid-svg-fQm8dIvuo1FgdCJK .node polygon,#mermaid-svg-fQm8dIvuo1FgdCJK .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-fQm8dIvuo1FgdCJK .node .label{text-align:center;}#mermaid-svg-fQm8dIvuo1FgdCJK .node.clickable{cursor:pointer;}#mermaid-svg-fQm8dIvuo1FgdCJK .arrowheadPath{fill:#333333;}#mermaid-svg-fQm8dIvuo1FgdCJK .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-fQm8dIvuo1FgdCJK .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-fQm8dIvuo1FgdCJK .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-fQm8dIvuo1FgdCJK .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-fQm8dIvuo1FgdCJK .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-fQm8dIvuo1FgdCJK .cluster text{fill:#333;}#mermaid-svg-fQm8dIvuo1FgdCJK .cluster span{color:#333;}#mermaid-svg-fQm8dIvuo1FgdCJK div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-fQm8dIvuo1FgdCJK :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} publisher exchange queue1 queue2 consumer1 consumer2 consumer3 交换机的作用
接收publisher发送的消息将消息按照规则路由到与之绑定的队列
Fanout交换机
Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue所以也叫广播模式。 Direct交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue因此称为定向路由。
每一个Queue都与Exchange设置一个BindingKey发布者发送消息时指定消息的RoutingKeyExchange将消息路由到BindingKey与消息RoutingKey一致的队列 Topic交换机
TopicExchange与DirectExchange类似区别在于routingKey可以是多个单词的列表并且以 “.” 分割。
Queue与Exchange指定BIndingKey时可以使用通配符
#代指0个或多个单词*代指一个单词 声明队列和交换机
SpringAMQP提供了几个类用来声明队列、交换机及其绑定关系
Queue用于声明队列可以用工厂类QueueBuilder构建Exchange用于声明交换机可以用工厂类ExchangeBuilder构建Binding用于声明队列和交换机的绑定关系可以用工厂类BindingBuilder构建 Configuration
public class FanoutConfiguration{Beanpublic FanoutExchange fanoutExchange(){//ExchangeBuilder.fanoutExchange().build();return new FanoutExchange(liner.fanout);}Beanpublic Queue fanoutQueue(){//QueueBuilder.durable().build();return new Queue(fanout.queue);}Beanpublic Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}}SpringAMQP还提供了基于 RabbitListener 注解来声明队列和交换机的方式
RabbitListener(bindings QueueBinding(value Queue (name direct.queue,durable true),exchange Exchange(name liner.direct,type ExchangeTypes.DIRECT),key {red,blue}
))
public void listenDirectQueue(String msg){System.out.println(消费者1接收到Direct消息:【msg】);
}MQ消息转换器
Spring的消息发送代码接收的消息体是一个Object
而在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。 默认情况下Spring采用的序列化方式是JDK序列化。存在问题数据体积过大、有安全漏洞、可读性差 因此建议采用JSON序列化代替默认的JDK序列化
在publisher和consumer两个服务中都引入依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency注意如果项目中引入了spring-boot-starter-web依赖则无需再次引入Jackson依赖。
配置消息转换器在publisher和consumer两个服务的启动类中添加Bean
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}消息转换器中添加的messageId可以便于我们将来做幂等性判断 高级篇
消息可靠性问题
发送者的可靠性
生产者重连
有的时候由于网络波动可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数下次等待时长 initial-interval * multipliermax-attempts: 3 #最大重试次数注意当网络不稳定的时候利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试也就是说多次重试等待的过程中当前线程是被阻塞的会影响业务性能。如果对于业务性能有要求建议禁用重试机制。如果一定要使用请合理配置等待时长和重试次数当然也可以考虑使用异步线程来执行发送消息的代码。
生产者确认
RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确机制认后在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况
消息投递到了MQ但是路由失败。此时会通过PublisherReturn返回路由异常原因然后返回ACK告知投递成功临时消息投递到了MQ并且入队成功返回ACK告知投递成功持久消息投递到了MQ并且入队完成持久化返回ACK告知投递成功其它情况都会返回NACK告知投递失败
SpringAMQP实现生产者确认 在publisher这个微服务的application.yml中添加配置 spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制并设置confirm类型publisher-returns: true #开启publisher return机制#这里publisher-confirm-type有三种模式可选:
# none: 关闭confirm机制
# simple: 同步阻塞等待MQ的回执消息
# correlated: MQ异步回调方式返回回执消息每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目启动过程中配置 Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) - {log.info(消息发送失败,应答码{{},原因{},交换机{},路由键{},消息{},replyCode,replyText,exchange,routingKey,message.toString());});}
}发送消息指定消息ID、消息ConfirmCallback Test
void testPublisherConfirm() throws InterruptedException {//1.创建CorrelationDataCorrelationData cd new CorrelationData();//2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑基本不会触发log.error(handle message ack fail, ex);}Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑参数中的result就是回执内容if(result.isAck()){ // result.isAck(), boolean类型, true代表ack回执,false代表nack回执log.debug(发送消息成功,收到ack!);}else{ // result.getReason(), String类型,返回nack时的异常描述log.error(发送消息失败,收到nack,reason:{},result.getReason());}}});//3.发送消息rabbitTemplate.convertAndSend(liner.direct,red,hello,cd);
}