希音电商网站,蛇口网站建设,荆门网站建设服务,深圳自己的网站建设文章目录 RabbitMQ 消息确认机制背景消费者消息确认机制概述手动确认#xff08;RabbitMQ 原生 SDK#xff09;手动确认#xff08;Spring-AMQP 封装 RabbitMQ SDK#xff09;AcknowledgeMode.NONEAcknowledgeMode.AUTO#xff08;默认#xff09;AcknowledgeMode.MANUAL… 文章目录 RabbitMQ 消息确认机制背景消费者消息确认机制概述手动确认RabbitMQ 原生 SDK手动确认Spring-AMQP 封装 RabbitMQ SDKAcknowledgeMode.NONEAcknowledgeMode.AUTO默认AcknowledgeMode.MANUALMANUAL 可能会引发的问题 RabbitMQ 消息确认机制 背景 上图中可以看出从生产者发送消息到消费者接收到消息并正确处理这些里路线都可能会出现问题那么为了保证这些消息最后能被正确处理RabbitMQ 就提供了消息确认机制.
消费者消息确认机制
概述 为了保证消息从 队列 到 消费者正确消费那么就引入了消费者消息确认机制.
a消费者在订阅队列时可以指定 autoAck 参数根据这个参数设置消息确认机制分为以下两种以下讲到的方法和参数来自于 RabbitMQ 原生的 SDK非 Spring 提供.
自动确认当 autoAck true 时RabbitMQ 会自动把发送出去的消息置为确认然后不管消费者是否真正的消费这些消息都会从内存中删除.适合对消息可靠性要求不高的场景.手动确认当 autoAck false 时RabbitMQ 会等待消费者显示的调用 Basic.Ack 命令波安排时间哦且确认消息然后才会从 内存或磁盘 中删除消息.适合对消息可靠性要求高的场景. Ps可靠性高了性能也就下降了所以请综合考虑. b对于 MQ队列 中的消息在 MQ管理平台上可以看到以下两种类别 Ready队列已经准备好消息随时准备发送给消费者 的消息数量只要消费者来要就立刻发送. Unacked消息已经发送给消费者但是消费者没有返回消息确认 的消息数量消息确认包括 ack肯定确认 和 nack否定确认
手动确认RabbitMQ 原生 SDK
消费者在收到消息之后可以选择确认也可以选择拒绝或者跳过RabbitMQ因此提供了不同的确认应答方式消费者客户端可通过调用 channel 的相关方法实现.
a肯定确认消费者已经接收到消息并且成功处理消息可以将其丢弃了.
Channel.basicAck(long deliveryTag, boolean multiple)deliveryTag消息的唯一标识单调递增的 long特点如下 deliveryTag 是每个 Channel 通道独立维护所以每个通道上的都是唯一的生产者 和 Broker 建立一个 channel 会生成一个 deliverTag消费者 和 Broker 建立一个 channel 会生成一个 deliverTag这俩 deliverTag 是不同的.当消费者 ack确认 一条消息时必须使用对应的通道上 deliveryTag 进行确认. multiple是否批量确认. 如果值为 true那么就会一次性 ack确认 所有小于或等于指定的 deliveryTag 的消息大大减少了网络开销 假设 deliveryTag 8multiple true那么 deliveryTag 8 的消息都会被确认.假设 deliveryTag 8multiple false只确认 8. PsdeliveryTag 确保了消息传递的可靠性和顺序性. b否定确认单个用来拒绝这个消息. 被拒绝的消息如何处理具体要看 requeue 参数.
Channel.basicReject(long deliveryTag, boolean requeue)requeue标识拒绝后这条消息如何处理. requeue true消息会重新存入队列将来会发送给下一个订阅的消费者.requeue false消息会从队列中移除因此不会发送给消费者.
c否定确认批量Channel.basicReject 只能拒绝一条消息如果要批量拒绝消息就可以使用 Channel.basicNack.
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)multiple参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.
dMQ 的管理平台上也提供了几种确认方式.
手动确认Spring-AMQP 封装 RabbitMQ SDK
Spring-AMQP 对消息确认提供了三种策略
public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}这里根 RabbitMQ 原生 SDK 是有些不同的.
AcknowledgeMode.NONE
不管消费者是否成功处理了消息RabbitMQ 都会自动确认消息然后从 队列 中移除消息.
a配置手动确认
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: noneb生产者接口
RestController
RequestMapping(/mq)
class MQApi(val rabbitTemplate: RabbitTemplate
) {RequestMapping(/ack)fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, ack msg 1)return ok}}c消费者
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.CharsetComponent
class AckListener {RabbitListener(queues [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println(接收到消息: ${String(message.body, Charset.forName(UTF-8))}, ${message.messageProperties.deliveryTag})//业务处理...println(业务逻辑处理完成)}} d效果演示 触发接口之后回到 MQ 管理平台可以看到队列中消息已经被删除.
AcknowledgeMode.AUTO默认
分为以下情况
消费者处理消息过程中没有抛出异常则自动确认消息然后从 队列 中移除消息.消费者处理消息过程中抛出异常则不会确认消息消息会重返队列并且不断重试MQ 管理平台中 Unacked 1.
a配置文件
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: autob生产者接口
RestController
RequestMapping(/mq)
class MQApi(val rabbitTemplate: RabbitTemplate
) {RequestMapping(/ack)fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, ack msg 1)return ok}}c消费者正常处理消息
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.CharsetComponent
class AckListener {RabbitListener(queues [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println(接收到消息: ${String(message.body, Charset.forName(UTF-8))}, ${message.messageProperties.deliveryTag})//业务处理...println(业务逻辑处理完成)}}效果如下 d消费者异常处理消息
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.CharsetComponent
class AckListener {RabbitListener(queues [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println(接收到消息: ${String(message.body, Charset.forName(UTF-8))}, ${message.messageProperties.deliveryTag})//业务处理...val a 1 / 0println(业务逻辑处理完成)}}效果如下 消息未被确认会不断重返队列进行重试因此 IDEA 中会循环报错输出.
AcknowledgeMode.MANUAL
分为以下情况
消费者在处理完消息后显示调用 basicAck 方法 来确认消息然后从 队列 中移除消息.消费者在处理完消息后显示调用 basicNack 方法 来否定确认消息是否从队列中移除消息需要看 requeue 参数的值 requeue true重返队列不断重试.requeue false丢弃消息. 消费者在处理完消息后什么都不做则不会确认消息消息会重返队列并且不断重试MQ 管理平台中 Unacked 1.
a配置文件
spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manualb生产者接口
RestController
RequestMapping(/mq)
class MQApi(val rabbitTemplate: RabbitTemplate
) {RequestMapping(/ack)fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, ack msg 1)return ok}}c消费者异常处理消息requeue true
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.CharsetComponent
class AckListener {RabbitListener(queues [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag message.messageProperties.deliveryTagtry {println(接收到消息: ${String(message.body, Charset.forName(UTF-8))}, $deliveryTag)//业务处理...val a 1 / 0println(业务逻辑处理完成)channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, true) //requeue: true}}}由于消息处理异常发送 nack并且 requeue true因此消息会重返队列不断重试.
d消费者异常处理消息requeue false
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.CharsetComponent
class AckListener {RabbitListener(queues [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag message.messageProperties.deliveryTagtry {println(接收到消息: ${String(message.body, Charset.forName(UTF-8))}, $deliveryTag)//业务处理...val a 1 / 0println(业务逻辑处理完成)channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}由于消息处理异常发送 nack并且 requeue false因此消息不会重返队列消息被丢弃. d消费者正常处理消息
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.CharsetComponent
class AckListener {RabbitListener(queues [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag message.messageProperties.deliveryTagtry {println(接收到消息: ${String(message.body, Charset.forName(UTF-8))}, $deliveryTag)//业务处理...println(业务逻辑处理完成)channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}消息被正常处理返回 ack.
MANUAL 可能会引发的问题 如果这里捕获的不是 Exception 异常那么消费者处理消息的时候可能会引发一些不会被捕获的异常就会导致没有返回 nack. 也就意味着没有进行确认应答那么 mq管理平台 上就会显示 Unacked 数值 1. Ps具体还是需要根据业务场景而定