网站服务器++免费,手机网站怎么dw做,重庆互联网公司排名,企业内部网站制作模板文章目录 一、RocketMQ实战1.1 批量消息发送1.2 消息发送队列自选择1.3 事务消息1.4 SpringCloud集成RocketMQ 二、最佳实践2.1 生产者2.1.1 发送消息注意事项2.1.2 消息发送失败处理方式 2.2 消费者2.2.1 消费过程幂等2.2.2 消费打印日志 2.3 Broker 三、相关问题3.1 为什么要… 文章目录 一、RocketMQ实战1.1 批量消息发送1.2 消息发送队列自选择1.3 事务消息1.4 SpringCloud集成RocketMQ 二、最佳实践2.1 生产者2.1.1 发送消息注意事项2.1.2 消息发送失败处理方式 2.2 消费者2.2.1 消费过程幂等2.2.2 消费打印日志 2.3 Broker 三、相关问题3.1 为什么要使用消息队列3.2 为什么要选择RocketMQ3.3 RocketMQ有什么优缺点3.4 消息队列有哪些消息模型3.5 【RocketMQ用什么消息模型】3.6 【消息的消费模式】6.7 RoctetMQ架构3.8 【如何保证消息的可靠性不丢失】3.9 【如何处理消息重复的问题】3.10 怎么处理消息积压3.11 顺序消息如何实现3.12 如何实现消息过滤3.13 延时消息3.14 【怎么实现分布式消息事务的】3.15 死信队列3.16 如何保证RocketMQ的高可用3.17 【RocketMQ的整体工作流程】3.18 为什么RocketMQ不使用Zookeeper作为注册中心3.19 Broker是怎么保存数据的3.20 RocketMQ怎么对文件进行读写的3.21 消息刷盘怎么实现的3.22 什么时候清理过期消息3.23 【RocketMQ的负载均衡是如何实现的】3.24 消息队列设计成推消息还是拉消息3.25 如何设计一个消息队列*3.26 RocketMQ消息体过大的解决方案3.27 【如何保证幂等性】 本系列文章 RocketMQ一消息的发送、存储与消费 RocketMQ二RocketMQ实战
一、RocketMQ实战
1.1 批量消息发送 RocketMQ批量消息发送是将同一主题的多条消息打包后一次性发送到消息服务端减少网络调用的次数和网络通信资源的损耗提高网络传输效率。示例 DefaultMQProducer producernew DefaultMQProducer(BatchProducerGroupName);producer.setNamesrvAddr(127.0.0.1:9876);producer.start();String topic BatchTest;ListMessage messages new ArrayList();messages.add(new Message(topic, Tag, OrderID001,Hello world1.getBytes()));messages.add(new Message(topic, Tag, OrderID002,Hello world2.getBytes()));System.out.println(producer.send(messages));producer.shutdown();1.2 消息发送队列自选择 消息发送默认根据主题的路由信息主题消息队列进行负载均衡负载均衡机制为轮询策略。假设这样一个场景订单的状态变更消息发送到特定主题为了避免消息消费者同时消费同一订单不同状态的变更消息在开发过程中我们应该使用顺序消息。为了提高消息消费的并发度如果我们能根据某种负载算法将相同订单不同的消息统一发送到同一个消息消费队列上则可以避免引入分布式锁RocketMQ在消息发送时提供了消息队列选择器MessageQueueSelector。示例 String[] tags new String[] {TagA, TagB, TagC, TagD,TagE};for (int i 0; i 100; i) {int orderId i % 10;Message msg new Message(TopicTestjjj, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg, new MessageQueueSelector() {public MessageQueue select(ListMessageQueue mqs,Message msg, Object arg){Integer id (Integer) arg;int index id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf(%s%n, sendResult);}1.3 事务消息 事务消息是RocketMQ提供的一种高级消息类型支持在分布式场景下保障消息生产和本地事务的最终一致性。
事务消息处理流程
生产者将消息发送至RocketMQ服务端。RocketMQ服务端将消息持久化成功之后向生产者返回Ack确认消息已经发送成功此时消息被标记为暂不能投递这种状态下的消息即为半事务消息。生产者开始执行本地事务逻辑。生产者根据本地事务执行结果向服务端提交二次确认结果Commit或是Rollback服务端收到确认结果后处理逻辑如下 二次确认结果为Commit服务端将半事务消息标记为可投递并投递给消费者。 二次确认结果为Rollback服务端将回滚事务不会将半事务消息投递给消费者。 在断网或者是生产者应用重启的特殊情况下若服务端未收到发送者提交的二次确认结果或服务端收到的二次确认结果为Unknown未知状态经过固定时间后服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数请参见参数限制。生产者收到消息回查后需要检查对应消息的本地事务执行的最终结果。生产者根据检查到的本地事务的最终状态再次提交二次确认服务端仍按照步骤4对半事务消息进行处理。
使用限制 1、消息类型一致性。事务消息仅支持在 MessageType 为 Transaction 的主题内使用即事务消息只能发送至类型为事务消息的主题中发送的消息的类型必须和主题的类型一致。 2、消费事务性。RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理建议消费端做好消费重试如果有短暂失败可以利用重试机制保证最终处理成功。 3、中间状态可见性。RocketMQ 事务消息为最终一致性即在消息提交到下游消费端处理完成之前下游分支和上游事务之间的状态会不一致。因此事务消息仅适合接受异步执行的事务场景。 4、事务超时机制。RocketMQ 事务消息的生命周期存在超时机制即半事务消息被生产者发送服务端后如果在指定时间内服务端无法确认提交或者回滚状态则消息默认会被回滚。使用示例 事务消息相比普通消息发送时需要修改以下几点 发送事务消息前需要开启事务并关联本地的事务执行。 为保证事务一致性在构建生产者时必须设置事务检查器和预绑定事务消息发送的主题列表客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。 //演示demo模拟订单表查询服务用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider new ClientServiceProvider();MessageBuilder messageBuilder new MessageBuilderImpl();//构造事务生产者事务消息需要生产者构建一个事务检查器用于检查确认异常半事务的中间状态。Producer producer provider.newProducerBuilder().setTransactionChecker(messageView - {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚此处以订单ID属性为例。* 在订单表找到了这个订单说明本地事务插入订单的操作已经正确提交如果订单表没有订单说明本地事务已经回滚。*/final String orderId messageView.getProperties().get(OrderId);if (Strings.isNullOrEmpty(orderId)) {// 错误的消息直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败直接退出。return;}Message message messageBuilder.setTopic(topic)//设置消息索引键可根据关键字精确查找某条消息。.setKeys(messageKey)//设置消息Tag用于消费端根据指定Tag过滤消息。.setTag(messageTag)//一般事务消息都会设置一个本地事务关联的唯一ID用来做本地事务回查的校验。.addProperty(OrderId, xxx)//消息体。.setBody(messageBody.getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败事务可以直接退出并回滚。return;}/*** 执行本地事务并确定本地事务结果。* 1. 如果本地事务提交成功则提交消息事务。* 2. 如果本地事务提交失败则回滚消息事务。* 3. 如果本地事务未知异常则不处理等待事务消息回查。**/boolean localTransactionOk doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试如果放弃重试可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息回滚异常时可以无需重试依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}1.4 SpringCloud集成RocketMQ
1、添加依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion${rocketmq.version}/version
/dependency2、配置RocketMQ相关信息 示例
spring:rocketmq:namesrvaddr: localhost:29876producerGroup:TestProducerconsumerGroup:TestConsumer3、在Java代码中进行消息的发送和消费业务的开发
二、最佳实践
2.1 生产者
2.1.1 发送消息注意事项
Tags的使用 一个应用尽可能用一个Topic而消息子类型则可以用tags来标识。tags可以由应用自由设置只有生产者在发送消息设置了tags消费方在订阅消息时才可以利用tags通过broker做消息过滤如message.setTags(“TagA”)。Keys的使用 每个消息在业务层面的唯一标识码要设置到keys字段方便将来定位消息丢失问题。服务器会为每个消息创建索引哈希索引应用可以通过topic、key来查询这条消息内容以及消息被谁消费。由于是哈希索引请务必保证key尽可能唯一这样可以避免潜在的哈希冲突。示例 // 订单Id String orderId 20034568923546; message.setKeys(orderId); 日志打印 消息发送成功或者失败要打印消息日志务必要打印SendResult和key字段。send消息方法只要不抛异常就代表发送成功。发送成功会有多个状态在sendResult里定义即之前提过的SEND_OK、FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE。
2.1.2 消息发送失败处理方式 Producer的send方法本身支持内部重试重试逻辑 至多重试2次同步发送为2次异步发送为0次。 如果发送失败则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值默认10s。 如果本身向broker发送消息产生超时异常就不会再重试。 以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高建议应用增加相应的重试逻辑比如调用send同步方法发送失败时则尝试将消息存储到db然后由后台线程定时重试确保消息一定到达Broker。
2.2 消费者
2.2.1 消费过程幂等 RocketMQ无法避免消息重复所以如果业务对消费重复非常敏感务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键可以是msgId也可以是消息内容中的唯一标识字段例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入并消费否则跳过。 msgId一定是全局唯一标识符但是实际使用中可能会存在相同的消息有两个不同msgId的情况消费者主动重发、因客户端重投机制导致的重复等这种情况就需要使业务字段进行重复消费。
2.2.2 消费打印日志 如果消息量较少建议在消费入口方法打印消息消费耗时等方便后续排查问题。 public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {log.info(RECEIVE_MSG_BEGIN: msgs.toString());// TODO 正常消费过程return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} 2.3 Broker
刷盘方式 SYNC_FLUSH同步刷新相比于ASYNC_FLUSH异步处理会损失很多性能但是也更可靠所以需要根据实际的业务场景做好权衡。
三、相关问题
3.1 为什么要使用消息队列 消息队列主要有三大用途解耦、异步、削峰。 以电商系统的下单为例
1、解耦 引入消息队列之前下单完成之后需要订单服务去调用库存服务减库存调用营销服务加营销数据……引入消息队列之后可以把订单完成的消息丢进队列里下游服务自己去调用就行了这样就完成了订单服务和其它服务的解耦合。2、异步 订单支付之后我们要扣减库存、增加积分、发送消息等等这样一来这个链路就长了链路一长响应时间就变长了。引入消息队列除了更新订单状态其它的都可以异步去做这样一来就来就能降低响应时间。3、削峰 例如秒杀系统平时流量很低但是要做秒杀活动流量突然增大Redis或数据库承受不了巨大的请求量。此时可以把请求扔到队列里面只放出我们服务能处理的流量这样就能抗住短时间的大流量了。4、数据同步 业务数据推送同步。5、日志处理 日志处理是指将消息队列用在日志处理中比如用Kafka传输日志 引入消息队列之后也会带来一些问题
1、系统可用性降低 此时就要保证消息队列的高可用。2、系统复杂度提高 此时要保证消息不重复消费、保证消息不丢失等。3、一致性问题 此时要保证数据在各个系统的一致性。
3.2 为什么要选择RocketMQ 市场上几大消息队列对比
RabbitMQActiveMQRocketMQKafka公司RabbitApache阿里Apache语言ErlangJavaJavaScalaJava协议支持AMPQOpenWire、STOMP、REST、 XMPP、AMQP自定义自定义协议社区封装了http协议支持客户端支持语言官方支持Erlang、Java、Ruby等社区产出多种API几乎支持所有语言Java、C、C、Python、PHP、Perl.net 等Java、C不成熟官方支持Java社区产出多种API如PHPPython等单击吞吐量万级万级十万级十万级消息延迟微秒级毫秒级毫秒级毫秒以内可用性高基于主从架构实现可用性高基于主从架构实现可用性非常高分布式架构非常高分布式架构一个数据多副本消息可靠性基本不丢有较低的概率丢失数据经过参数优化配置可以做到零丢失经过参数配置消息可以做到零丢失功能支持基于erlang开发所以并发性能极强性能极好延时低MQ领域的功能极其完备MQ功能较为完备分布式扩展性好功能较为简单主要支持加单MQ功能优势erlang语言开发性能极好、延时很低吞吐量万级、MQ功能完备管理界面非常好社区活跃互联网公司使用较多非常成熟功能强大在业内大量公司和项目中都有应用接口简单易用阿里出品有保障吞吐量大分布式扩展方便、社区比较活跃支持大规模的Topic、支持复杂的业务场景可以基于源码进行定制开发超高吞吐量ms级的时延极高的可用性和可靠性分布式扩展方便劣势吞吐量较低erlang语音开发不容易进行定制开发集群动态扩展麻烦偶尔有较低概率丟失消息社区活跃度不高接口不是按照标准JMSJava Message ServiceJava消息服务应用程序接口是一个Java平台中关于面向消息中间件的API用于在两个应用程序之间或分布式系统中发送消息进行异步通信。Java消息服务是一个与具体平台无关的API规范走的有的系统迁移要修改大量的代码技术有被抛弃的风险有可能进行消息的重复消费应用都有使用主要用于解耦和异步较少用在大规模吞吐的场景中用于大规模吞吐、复杂业务中在大数据的实时计算和日志采集中被大规模使用是业界的标准topic数量对吞吐量的影响topic 可以达到几百/几千的级别吞吐量会有较小幅度的下降这是 RocketMQ的一大优势在同等机器下可以支撑大量的 topictopic 从几十到几百个时候吞吐量会大幅度下降在同等机器下Kafka尽量保证 topic 数量不要过多如果要支撑大规模的topic需要增加更多的机器资源 总结选择中间件的可以从这些维度来考虑可靠性性能功能可运维行可拓展性社区活跃度。目前常用的几个中间件ActiveMQ作为“老古董”市面上用的已经不多。
RabbitMQ 优点轻量迅捷容易部署和使用拥有灵活的路由配置。 缺点性能和吞吐量不太理想不易进行二次开发。RocketMQ 优点性能好高吞吐量稳定可靠有活跃的中文社区。 缺点兼容性上不是太好。Kafka 优点拥有强大的性能及吞吐量兼容性很好。 缺点由于“攒一波再处理”导致延迟比较高。 一般的业务系统要引入MQ最早大家都用ActiveMQ但是现在确实大家用的不多了没经过大规模吞吐量场景的验证社区也不是很活跃不推荐。 如果是面向用户的C端系统具有一定的并发量对性能也有比较高的要求可以选择低延迟、吞吐量比较高可用性比较好的RocketMQ。 RabbitMQ基于erlang开发对消息堆积的支持并不好当大量消息积压的时候会导致RabbitMQ的性能急剧下降。每秒钟可以处理几万到十几万条消息。 RocketMQ基于Java开发面向互联网集群化功能丰富对在线业务的响应时延做了很多的优化大多数情况下可以做到毫秒级的响应每秒钟大概能处理几十万条消息。 Kafka基于Scala开发面向日志功能丰富性能最高。当你的业务场景中每秒钟消息数量没有那么多的时候Kafka 的时延反而会比较高。所以Kafka 不太适合在线业务场景。 ActiveMQ基于Java开发简单稳定性能不如前面三个。小型系统用也ok但是不推荐。 所以中小型公司技术实力较为一般技术挑战不是特别高用 RabbitMQ 是不错的选择大型公司基础架构研发实力较强用 RocketMQ 是很好的选择。 如果是大数据领域的实时计算、日志采集等场景用 Kafka 是业内标准的绝对没问题社区活跃度很高绝对不会黄何况几乎是全世界这个领域的事实性规范。
3.3 RocketMQ有什么优缺点
RocketMQ优点 单机吞吐量十万级。 可用性非常高分布式架构。 消息可靠性经过参数优化配置消息可以做到0丢失。 功能支持MQ功能较为完善还是分布式的扩展性好。 支持10亿级别的消息堆积不会因为堆积导致性能下降。 源码是Java方便结合公司自己的业务二次开发。 天生为金融互联网领域而生对于可靠性要求很高的场景尤其是电商里面的订单扣款以及业务削峰在大量交易涌入时后端可能无法及时处理的情况。 RoketMQ在稳定性上可能更值得信赖这些业务场景在阿里双11已经经历了多次考验如果你的业务有上述并发场景建议可以选择RocketMQ。RocketMQ缺点 支持的客户端语言不多目前是Java及c其中c不成熟。 没有在 MQ核心中去实现JMS等接口有些系统要迁移需要修改大量代码。
3.4 消息队列有哪些消息模型 消息队列有两种模型队列模型和发布/订阅模型。
队列模型 最初的一种消息队列模型对应着消息队列“发-存-收”的模型。生产者往某个队列里面发送消息一个队列可以存储多个生产者的消息一个队列也可以有多个消费者但是消费者之间是竞争关系也就是说每条消息只能被一个消费者消费。 发布/订阅模型 如果需要将一份消息数据分发给多个消费者并且每个消费者都要求收到全量的消息。很显然队列模型无法满足这个需求。解决的方式就是发布/订阅模型。 在发布 - 订阅模型中消息的发送方称为发布者Publisher消息的接收方称为订阅者Subscriber服务端存放消息的容器称为主题Topic。发布者将消息发送到主题中订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作同时还可以认为是主题在消费时的一个逻辑副本每份订阅中订阅者都可以接收到主题的所有消息。 它和 “队列模式” 的异同生产者就是发布者队列就是主题消费者就是订阅者无本质区别。不同点在于一份消息数据是否可以被多次消费。
3.5 【RocketMQ用什么消息模型】 RocketMQ使用的消息模型是标准的发布-订阅模型在RocketMQ的术语表中生产者、消费者和主题与发布-订阅模型中的概念是完全一样的。 RocketMQ本身的消息是由下面几部分组成
Message Message消息就是要传输的信息。一条消息必须有一个主题Topic一条消息也可以拥有一个可选的标签Tag。Topic Topic主题可以看做消息的归类它是消息的第一级类型。比如一个电商系统可以分为交易消息、物流消息等一条消息必须有一个 Topic 。 Topic 与生产者和消费者的关系非常松散一个 Topic 可以有0个、1个、多个生产者向其发送消息一个生产者也可以同时向不同的 Topic 发送消息。 一个 Topic 也可以被 0个、1个、多个消费者订阅。Tag Tag标签可以看作子主题它是消息的第二级类型用于为用户提供额外的灵活性。使用标签同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为交易创建消息、交易完成消息等一条消息可以没有 Tag 。Group RocketMQ中订阅者的概念是通过消费组Consumer Group来体现的。每个消费组都消费主题中一份完整的消息不同消费组之间消费进度彼此不受影响也就是说一条消息被Consumer Group1消费过也会再给Consumer Group2消费。 消费组中包含多个消费者同一个组内的消费者是竞争消费的关系每个消费者负责消费组内的一部分消息。默认情况如果一条消息被消费者Consumer1消费了那同组的其他消费者就不会再收到这条消息。Message Queue Message Queue消息队列一个 Topic 下可以设置多个消息队列Topic 包括多个 Message Queue 如果一个 Consumer 需要获取 Topic下所有的消息就要遍历所有的 Message Queue。Offset 在Topic的消费过程中由于消息需要被不同的组进行多次消费所以消费完的消息并不会立即被删除这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置Consumer Offset这个位置之前的消息都被消费过之后的消息都没有被消费过每成功消费一条消息消费位置就加一。 也可以这么说Queue 是一个长度无限的数组Offset 就是下标。
3.6 【消息的消费模式】 消息消费模式有两种Clustering集群消费和Broadcasting广播消费。 默认情况下就是集群消费这种模式下一个消费者组共同消费一个主题的多个队列一个队列只会被一个消费者消费如果某个消费者挂掉分组内其它消费者会接替挂掉的消费者继续消费。 而广播消费消息会发给消费者组中的每一个消费者进行消费。
6.7 RoctetMQ架构 RocketMQ一共有四个部分组成NameServer、Broker、Producer、Consumer。
NameServer NameServer 是一个无状态的服务器角色类似于 Kafka使用的 Zookeeper但比 Zookeeper 更轻量。其特点 每个NameServer结点之间是相互独立彼此没有任何信息交互。Nameserver被设计成几乎是无状态的通过部署多个结点来标识自己是一个伪集群Producer 在发送消息前从 NameServer 中获取 Topic 的路由信息也就是发往哪个 BrokerConsumer 也会定时从 NameServer 获取 Topic 的路由信息Broker 在启动时会向 NameServer 注册并定时进行心跳连接且定时同步维护的 Topic 到 NameServer。 NameServer功能主要有两个 1、和Broker结点保持长连接。 2、维护Topic的路由信息。 Broker 消息存储和中转角色负责存储和转发消息。 Broker 内部维护着一个个 Consumer Queue用来存储消息的索引真正存储消息的地方是 CommitLog日志文件。 单个 Broker 与所有的 Nameserver 保持着长连接和心跳并会定时将 Topic 信息同步到 NameServer和 NameServer 的通信底层是通过 Netty 实现的。Producer 消息生产者Producer由用户进行分布式部署消息由Producer通过多种负载均衡模式发送到Broker集群发送低延时支持快速失败。 Producer的负载均衡是由MQFaultStratege.selectOneMessageQueue()来实现的。这个方法就是随机选择一个要发送消息的broker来达到负载均衡的效果选择的标准尽量不选刚刚选过的broker尽量不选发送上条消息延迟过高或没有响应的broker也就是找到一个可用的broker。 RocketMQ 提供了三种方式发送消息同步、异步和单向 同步发送同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息例如重要通知邮件、营销短信。异步发送异步发送指发送方发出数据后不等接收方发回响应接着发送下个数据包一般用于可能链路耗时较长而对响应时间敏感的业务场景例如用户视频上传后通知启动转码服务。单向发送单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发适用于某些耗时非常短但对可靠性要求并不高的场景例如日志收集。 Consumer 消息消费者负责消费消息一般是后台系统负责异步消费。 Consumer也由用户部署支持PUSH和PULL两种消费模式支持集群消费和广播消费提供实时的消息订阅机制。 Pull拉取型消费者Pull Consumer主动从消息服务器拉取信息只要批量拉取到消息用户应用就会启动消费过程所以 Pull 称为主动消费型。 Push推送型消费者Push Consumer封装了消息的拉取、消费进度和其他的内部维护工作将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型但其实从实现上看还是从消息服务器中拉取消息不同于 Pull 的是 Push 首先要注册消费监听器当监听器处触发后才开始消费消息。 RocketMQ是拉模式。
3.8 【如何保证消息的可靠性不丢失】 消息可能会在这三个阶段发生丢失生产阶段、存储阶段、消费阶段。所以要从这三个阶段考虑
生产阶段 在生产阶段主要通过请求确认机制来保证消息的可靠传递。 1、同步发送的时候要注意处理响应结果和异常。如果返回响应OK表示消息成功发送到了Broker如果响应失败或者发生其它异常都应该重试。 2、异步发送的时候应该在回调方法里检查如果发送失败或者异常都应该进行重试。 3、如果发生超时的情况也可以通过查询日志的API来检查是否在Broker存储成功。存储阶段 可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息简单说就是可靠性优先的场景都应该使用同步。 1、消息只要持久化到CommitLog日志文件中即使Broker宕机未消费的消息也能重新恢复再消费。 2、Broker的刷盘机制同步刷盘和异步刷盘不管哪种刷盘都可以保证消息一定存储在pagecache中内存中但是同步刷盘更可靠它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。 3、Broker通过主从模式来保证高可用Broker支持Master和Slave同步复制、Master和Slave异步复制模式生产者的消息都是发送给Master但是消费既可以从Master消费也可以从Slave消费。同步复制模式可以保证即使Master宕机消息肯定在Slave中有备份保证了消息不会丢失。 消费阶段 Consumer保证消息成功消费的关键在于确认的时机不要在收到消息后就立即发送消费确认而是应该在执行完所有消费业务逻辑之后再发送消费确认。因为消息队列维护了消费的位置逻辑执行失败了没有确认再去队列拉取消息就还是之前的一条。
3.9 【如何处理消息重复的问题】 对分布式消息队列来说同时做到确保一定投递和不重复投递是很难的就是所谓的“有且仅有一次” 。RocketMQ择了确保一定投递保证消息不丢失但有可能造成消息重复。 处理消息重复问题主要由业务端自己保证主要的方式有两种业务幂等和消息去重。 业务幂等 第一种是保证消费逻辑的幂等性也就是多次调用和一次调用的效果是一样的。这样一来不管消息消费多少次对业务都没有影响。常用的解决方案有 1、唯一索引防止新增脏数据。比如支付宝的资金账户支付宝也有用户账户每个用户只能有一个资金账户怎么防止给用户创建资金账户多个那么给资金账户表中的用户ID加唯一索引所以一个用户新增成功一个资金账户记录。要点唯一索引或唯一组合索引来防止新增数据存在脏数据当表存在唯一索引并发时新增报错时再查询一次就可以了数据应该已经存在了返回结果即可。 2、token机制防止页面重复提交。业务要求 页面的数据只能被点击提交一次发生原因 由于重复点击或者网络重发或者nginx重发等情况会导致数据被重复提交解决办法 集群环境采用token加redis(redis单线程的处理需要排队)单JVM环境采用token加redis或token加jvm内存。处理流程1数据提交前要向服务的申请tokentoken放到redis或jvm内存token有效时间2提交后后台校验token同时删除token生成新的token返回。 3、分布式锁——还是拿插入数据的例子如果是分布是系统构建全局唯一索引比较困难例如唯一性的字段没法确定这时候可以引入分布式锁通过第三方的系统(redis或zookeeper)在业务系统插入数据或者更新数据获取分布式锁然后做操作之后释放锁这样其实是把多线程并发的锁的思路引入多多个系统也就是分布式系统中得解决思路。 消息去重 第二种是业务端对重复的消息就不再消费了。这种方法需要保证每条消息都有一个唯一的编号通常是业务相关的比如订单号消费的记录需要落库而且需要保证和消息确认这一步的原子性。 具体做法是可以建立一个消费记录表拿到消息后先检查记录表里是否已经有消费过的记录没有的再消费。这个消费记录表里要有一个唯一的ID比如订单ID等。
3.10 怎么处理消息积压 发生了消息积压这时候就得想办法赶紧把积压的消息消费完就得考虑提高消费能力一般有两种办法 消费者扩容如果当前Topic的Message Queue的数量大于消费者数量就可以对消费者进行扩容增加消费者来提高消费能力尽快把积压的消息消费玩。 消息迁移Queue扩容如果当前Topic的Message Queue的数量小于或者等于消费者数量这种情况再扩容消费者就没什么用就得考虑扩容Message Queue。可以新建一个临时的Topic临时的Topic多设置一些Message Queue然后先用一些消费者把消费的数据丢到临时的Topic因为不用业务处理只是转发一下消息还是很快的。接下来用扩容的消费者去消费新的Topic里的数据消费完了之后恢复原状。
3.11 顺序消息如何实现 顺序消息是指消息的消费顺序和产生顺序相同在有些业务逻辑下必须保证顺序比如订单的生成、付款、发货这个消息必须按顺序处理才行。 顺序消息分为全局顺序消息和部分顺序消息 全局顺序消息:某个Topic下的所有消息都要保证顺序 部分顺序消息:只要保证每一组消息被顺序消费即可比如订单消息只要保证同一个订单 ID 个消息能按顺序消费即可。 部分顺序消息 部分顺序消息相对比较好实现生产端需要做到把同 ID 的消息发送到同一个 Message Queue 在消费过程中要做到从同一个Message Queue读取的消息顺序处理——消费端不能并发处理顺序消息这样才能达到部分有序。 由producer发送到broker的消息队列是满足FIFO的所以发送是顺序的单个queue里的消息是顺序的。多个Queue同时消费是无法绝对保证消息的有序性的。所以同一个topic同一个queue发消息的时候一个线程发送消息消费的时候一个线程去消费一个queue里的消息。 全局顺序消息 RocketMQ 默认情况下不保证顺序比如创建一个 Topic 默认八个写队列八个读队列这时候一条消息可能被写入任意一个队列里在数据的读取过程中可能有多个 Consumer 每个 Consumer 也可能启动多个线程并行处理所以消息被哪个 Consumer 消费被消费的顺序和写人的顺序是否一致是不确定的。 要保证全局顺序消息 需要先把 Topic 的读写队列数设置为 一然后Producer Consumer 的并发设置也要是一。简单来说为了保证整个 Topic全局消息有序只能消除所有的并发处理各部分都设置成单线程处理 这时候就完全牺牲RocketMQ的高并发、高吞吐的特性了。
3.12 如何实现消息过滤 有两种方案 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤这样做的好处是避免了无用的消息传输到 Consumer 端缺点是加重了 Broker 的负担实现起来相对复杂。 另一种是在 Consumer 端过滤比如按照消息设置的 tag 去重这样的好处是实现起来简单缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。 一般采用Cosumer端过滤如果希望提高吞吐量可以采用Broker过滤。 对消息的过滤有三种方式 1、根据Tag过滤这是最常见的一种用起来高效简单。
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CID_EXAMPLE);
consumer.subscribe(TOPIC, TAGA || TAGB || TAGC);2、SQL 表达式过滤SQL表达式过滤更加灵活。
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name_4);
// 只有订阅的消息有这个属性a, a 0 and a 3
consumer.subscribe(TopicTest, MessageSelector.bySql(a between 0 and 3);
consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();3、Filter Server 方式最灵活也是最复杂的一种方式允许用户自定义函数进行过滤。
3.13 延时消息 电商的订单超时自动取消就是一个典型的利用延时消息的例子用户提交了一个订单就可以发送一个延时消息1h后去检查这个订单的状态如果还是未付款就取消订单释放库存。 RocketMQ是支持延时消息的只需要在生产消息的时候设置消息的延时级别
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer new DefaultMQProducer(ExampleProducerGroup);
// 启动生产者
producer.start();
int totalMessagesToSend 100;
for (int i 0; i totalMessagesToSend; i) {Message message new Message(TestTopic, (Hello scheduled message i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);
}目前RocketMQ支持的延时级别是有限的
private String messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;RocketMQ怎么实现延时消息的 临时存储定时任务。 Broker收到延时消息了会先发送到主题SCHEDULE_TOPIC_XXXX的相应时间段的Message Queue中然后通过一个定时任务轮询这些队列到期后把消息投递到目标Topic的队列中然后消费者就可以正常消费这些消息。
3.14 【怎么实现分布式消息事务的】 半消息是指暂时还不能被 Consumer 消费的消息Producer 成功发送到 Broker 端的消息但是此消息被标记为 “暂不可投递” 状态只有等 Producer 端执行完本地事务后经过二次确认了之后Consumer 才能消费此条消息。 依赖半消息可以实现分布式消息事务其中的关键在于二次确认以及消息回查
1、Producer 向 broker 发送半消息。2、Producer 端收到响应消息发送成功此时消息是半消息标记为 “不可投递” 状态Consumer 消费不了。3、Producer 端执行本地事务。4、正常情况本地事务执行完成Producer 向 Broker 发送 Commit/Rollback如果是 CommitBroker 端将半消息标记为正常消息Consumer 可以消费如果是 RollbackBroker 丢弃此消息。5、异常情况Broker 端迟迟等不到二次确认。在一定时间后会查询所有的半消息然后到 Producer 端查询半消息的执行情况。6、Producer 端查询本地事务的状态。7、根据事务的状态提交 commit/rollback 到 broker 端。567 是消息回查。8、消费者段消费到消息之后执行本地事务执行本地事务。
3.15 死信队列 死信队列用于处理无法被正常消费的消息即死信消息。 当一条消息初次消费失败消息队列 RocketMQ 会自动进行消息重试达到最大重试次数后若消费依然失败则表明消费者在正常情况下无法正确地消费该消息此时消息队列 RocketMQ 不会立刻将消息丢弃而是将其发送到该消费者对应的特殊队列中该特殊队列称为死信队列。 死信消息的特点 不会再被消费者正常消费。 有效期与正常消息相同均为 3 天。3 天后会被自动删除。因此需要在死信消息产生后的 3 天内及时处理。 死信队列的特点 一个死信队列对应一个 Group ID 而不是对应单个消费者实例。 如果一个 Group ID 未产生死信消息消息队列 RocketMQ 不会为其创建相应的死信队列。 一个死信队列包含了对应 Group ID 产生的所有死信消息不论该消息属于哪个 Topic。 RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。
3.16 如何保证RocketMQ的高可用 NameServer因为是无状态且不相互通信的所以只要集群部署就可以保证高可用。 RocketMQ的高可用主要是在体现在Broker的读和写的高可用Broker的高可用是通过集群和主从实现的。 Broker可以配置两种角色Master和SlaveMaster角色的Broker支持读和写Slave角色的Broker只支持读Master会向Slave同步消息。也就是说Producer只能向Master角色的Broker写入消息Cosumer可以从Master和Slave角色的Broker读取消息。 Consumer 的配置文件中并不需要设置是从 Master 读还是从 Slave读当 Master 不可用或者繁忙的时候 Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制当一个 Master 角色的机器出现故障后Consumer 仍然可以从 Slave 读取消息不影响 Consumer 读取消息这就实现了读的高可用。 如何达到发送端写的高可用性呢在创建 Topic 的时候把 Topic 的多个Message Queue 创建在多个 Broker 组上相同 Broker 名称不同 brokerId机器组成 Broker 组这样当 Broker 组的 Master 不可用后其他组Master 仍然可用 Producer 仍然可以发送消息 RocketMQ 目前还不支持把Slave自动转成 Master 如果机器资源不足需要把 Slave 转成 Master 则要手动停止 Slave 色的 Broker 更改配置文件用新的配置文件启动 Broker。
3.17 【RocketMQ的整体工作流程】 简单来说RocketMQ是一个分布式消息队列也就是消息队列分布式系统。 作为消息队列它是发-存-收的一个模型对应的就是Producer、Broker、Cosumer作为分布式系统它要有服务端、客户端、注册中心对应的就是Broker、Producer/Consumer、NameServer。
1、Broker在启动的时候去向所有的NameServer注册并保持长连接每30s发送一次心跳。2、Producer在发送消息的时候从NameServer获取Broker服务器地址根据负载均衡算法选择一台服务器来发送消息。3、Conusmer消费消息的时候同样从NameServer获取Broker地址然后主动拉取消息来消费。 3.18 为什么RocketMQ不使用Zookeeper作为注册中心 CAP理论指的是在一个分布式系统中Consistency(一致性)、Availability(可用性)、Partition Tolerance(分区容错性)不能同时成立。 Kafka采用Zookeeper作为注册中心——当然也开始逐渐去ZookeeperRocketMQ不使用Zookeeper其实主要可能从这几方面来考虑
1、基于可用性的考虑 根据CAP理论同时最多只能满足两个点。Zookeeper满足的是CP也就是说Zookeeper并不能保证服务的可用性Zookeeper在进行选举的时候整个选举的时间太长期间整个集群都处于不可用的状态而这对于一个注册中心来说肯定是不能接受的作为服务发现来说就应该是为可用性而设计。2、基于性能的考虑 NameServer本身的实现非常轻量而且可以通过增加机器的方式水平扩展增加集群的抗压能力而Zookeeper的写是不可扩展的Zookeeper要解决这个问题只能通过划分领域划分多个Zookeeper集群来解决首先操作起来太复杂其次这样还是又违反了CAP中的A的设计导致服务之间是不连通的。3、持久化的机制来带的问题 ZooKeeper 的 ZAB 协议对每一个写请求会在每个 ZooKeeper 节点上保持写一个事务日志同时再加上定期的将内存数据镜像Snapshot到磁盘来保证数据的一致性和持久性而对于一个简单的服务发现的场景来说这其实没有太大的必要这个实现方案太重了。而且本身存储的数据应该是高度定制化的。4、消息发送应该弱依赖注册中心 RocketMQ的设计理念也正是基于此生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地如果NameServer整个集群不可用短时间内对于生产者和消费者并不会产生太大影响。
3.19 Broker是怎么保存数据的 RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件。
CommitLog 消息主体以及元数据的存储主体存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位左边补零剩余为起始偏移量比如00000000000000000000代表了第一个文件起始偏移量为0文件大小为1G1073741824当第一个文件写满了第二个文件为00000000001073741824起始偏移量为1073741824以此类推。消息主要是顺序写入日志文件当文件满了写入下一个文件。 CommitLog文件保存于${Rocket_Home}/store/commitlog目录中。ConsumeQueue 消息消费队列引入的目的主要是提高消息消费的性能由于RocketMQ是基于主题topic的订阅模式消息消费是针对主题进行的如果要遍历commitlog文件中根据topic检索消息是非常低效的。 Consumer即可根据ConsumeQueue来查找待消费的消息。其中ConsumeQueue逻辑消费队列作为消费消息的索引保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset消息大小size和消息Tag的HashCode值。 ConsumeQueue文件可以看成是基于Topic的CommitLog索引文件故ConsumeQueue文件夹的组织方式如下topic/queue/file三层组织结构具体存储路径为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样ConsumeQueue文件采取定长设计每一个条目共20个字节分别为8字节的CommitLog物理偏移量、4字节的消息长度、8字节tag hashcode单个文件由30W个条目组成可以像数组一样随机访问每一个条目每个ConsumeQueue文件大小约5.72M。IndexFile IndexFile索引文件提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是{fileName}文件名fileName是以创建时的时间戳命名的固定的单个IndexFile文件大小约为400M一个IndexFile可以保存 2000W个索引IndexFile的底层存储设计为在文件系统中实现HashMap结构故RocketMQ的索引文件其底层实现为hash索引。 总结 RocketMQ采用的是混合型的存储结构即为Broker单个实例下所有的队列共用一个日志数据文件即为CommitLog来存储。 RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构Producer发送消息至Broker端然后Broker端使用同步或者异步的方式对消息刷盘持久化保存至CommitLog中。 只要消息被刷盘持久化至磁盘文件CommitLog中那么Producer发送的消息就不会丢失。正因为如此Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后可以等下一次消息拉取同时服务端也支持长轮询模式如果一个消息拉取请求未拉取到消息Broker允许等待30s的时间只要这段时间内有新消息到达将直接返回给消费端。 RocketMQ的具体做法是使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue逻辑消费队列和IndexFile索引文件数据。
3.20 RocketMQ怎么对文件进行读写的 RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache、顺序读写、零拷贝。
顺序读写 磁盘的顺序读写性能要远好于随机读写。因为每次从磁盘读数据时需要先寻址找到数据在磁盘上的物理位置。对于机械硬盘来说就是移动磁头会消耗时间。顺序读写相比于随机读写省去了大部分的寻址时间它只需要寻址一次就可以连续读写下去所以性能比随机读写好很多。 RocketMQ 利用了这个特性。它所有的消息数据都存放在一个无限增长的文件队列 CommitLog 中CommitLog 是由一组 1G 内存映射文件队列组成的。写入时就从一个固定位置一直写下去一个文件写满了就开启一个新文件顺序读写下去。页缓存 页缓存PageCache)是OS对文件的缓存用于加速对文件的读写。一般来说程序对文件进行顺序读写的速度几乎接近于内存的读写速度主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化将一部分的内存用作PageCache。对于数据的写入OS会先写入至Cache内随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取如果一次读取文件时出现未命中PageCache的情况OS从物理磁盘上访问读取文件的同时会顺序对其他相邻块的数据文件进行预读取。零拷贝 RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销将对文件的操作转化为直接对内存地址进行操作从而极大地提高了文件的读写效率正因为需要使用内存映射机制故RocketMQ的文件存储都使用定长结构来存储方便一次将整个文件映射至内存。 在操作系统中使用传统的方式数据需要经历几次拷贝还要经历用户态/内核态切换。 从磁盘复制数据到内核态内存从内核态内存复制到用户态内存然后从用户态内存复制到网络驱动的内核态内存最后是从网络驱动的内核态内存复制到网卡中进行传输。 可以通过零拷贝的方式减少用户态与内核态的上下文切换和内存拷贝的次数用来提升I/O的性能。零拷贝比较常见的实现方式是mmap这种机制在Java中是通过MappedByteBuffer实现的。
3.21 消息刷盘怎么实现的 RocketMQ提供了两种刷盘策略 同步刷盘在消息达到Broker的内存之后必须刷到commitLog日志文件中才算成功然后返回Producer数据已经发送成功。 异步刷盘异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功会唤醒一个线程去将数据持久化到CommitLog日志文件中。
3.22 什么时候清理过期消息 4.6版本默认48默认是72但是broker配置文件默认改成了48所以新版本都是48小时后会删除不再使用的CommitLog文件。 检查这个文件最后访问时间。 判断是否大于过期时间。 指定时间删除默认凌晨4点。 3.23 【RocketMQ的负载均衡是如何实现的】 RocketMQ是分布式消息服务负载均衡是在生产和消费的客户端完成的。
生产者的负载均衡 实质是在选择MessageQueue对象内部包含了brokerName和queueId第一种是默认策略从MessageQueue列表中随机选择一个算法时通过自增随机数对列表打下取余得到位置信息但获得的MessageQueue所在集群不能是上次失败集群。第二种是超时容忍策略先随机选择一个MessageQueue如果因为超时等异常发送失败会优先选择该broker集群下其他MessageQueue发送如果没找到就从之前发送失败的Broker集群中选一个进行发送若还没有找到才使用默认策略。消费者的负载均衡 可选的有六种算法。 1、平均分配算法。 2、环形算法。 3、指定机房算法。 4、就近机房算法。 5、统一哈希算法。使用一致性哈希算法进行负载每次负载都会重建一致性hash路由表获取本地客户端负责的所有队列信息。默认的hash算法为MD5假设有4个消费者客户端和2个消息队列mq1和mq2通过hash后分布在hash环的不同位置按照一致性hash的顺时针查找原则mq1被client2消费mq2被client3消费。 6、手动配置算法。
3.24 消息队列设计成推消息还是拉消息
推拉模式 推拉模式的时候指的是Comsumer和Broker之间的交互。 默认的认为Producer与Broker之间就是推的方式即Producer将消息推送给Broker而不是Broker主动去拉取消息。推模式 推模式指的是消息从Broker推向Consumer即Consumer被动的接收消息由Broker来主导消息的发送。 【推模式的优点】 1、消息实时性高 Broker 接受完消息之后可以立马推送给 Consumer。 2、对于消费者使用来说简单。 【推模式的缺点】 推送速率难以适应消费速率推模式的目标就是以最快的速度推送消息当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时随着时间的增长消费者那边可能就“爆仓”了因为根本消费不过来。 并且不同的消费者的消费速率还不一样身为Broker很难平衡每个消费者的推送速率如果要实现自适应的推送速率那就需要在推送的时候消费者告诉Broker 然后Broker需要维护每个消费者的状态进行推送速率的变更。这增加了Broker自身的复杂度。所以说推模式难以根据消费者的状态控制推送速率适用于消息量不大、消费能力强要求实时性高的情况。拉模式 拉模式指的是Consumer主动向Broker请求拉取消息即Broker被动的发送消息给Consumer。 拉模式主动权就在消费者身上了消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了它可以根据一定的策略停止拉取或者间隔拉取都行。 拉模式下Broker就相对轻松了它只管存生产者发来的消息至于消费的时候自然由消费者主动发起来一个请求就给它消息从哪开始拿消息拿多少消费者都告诉它。 拉模式可以更合适的进行消息的批量发送基于推模式可以来一个消息就推送也可以缓存一些消息之后再推送但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。 【拉模式的缺点】 1消息延迟。毕竟是消费者去拉取消息但是消费者怎么知道消息到了呢所以它只能不断地拉取但是又不能很频繁地请求太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率比如隔个2 秒请求一次你看着消息就很有可能延迟 2 秒了。 2消息忙请求忙请求就是比如消息隔了几个小时才有那么在几个小时之内消费者的请求都是无效的在做无用功。推拉模式如何选择 RocketMQ 和Kafka都选择了拉模式当然业界也有基于推模式的消息队列如 ActiveMQ。 拉模式可能适用性更好些因为现在的消息队列都有持久化消息的需求也就是说本身它就有个存储功能它的使命就是接受消息保存好消息使得消费者可以消费消息即可。
3.25 如何设计一个消息队列* 需要明确地提出消息中间件的几个重要角色分别是生产者、消费者、Broker、注册中心。 简述下消息中间件数据流转过程无非就是生产者生成消息发送至BrokerBroker可以暂缓消息然后消费者再从Broker获取消息用于消费。 注册中心用于服务的发现包括Broker的发现、生产者的发现、消费者的发现当然还包括下线可以说服务的高可用离不开注册中心。 然后开始简述实现要点可以同通信讲起各模块的通信可以基于Netty然后自定义协议来实现。注册中心可以利用zookeeper、consul、eureka、nacos等等也可以像RocketMQ自己实现简单的nameserver。 为了考虑扩容和整体的性能采用分布式的思想像Kafka一样采取分区理念一个Topic分为多个partition。并且为保证数据可靠性采取多副本存储即Leader和follower根据性能和数据可靠的权衡提供异步和同步的刷盘存储。 并且利用选举算法保证Leader挂了之后Follower可以顶上保证消息队列的高可用。 也同样为了提高消息队列的可靠性利用本地文件系统来存储消息并且采用顺序写的方式来提高性能。也可以根据消息队列的特性利用内存映射、零拷贝进一步的提升性能。
3.26 RocketMQ消息体过大的解决方案 官方定义消息体默认大小为 4MB普通顺序消息类型。事务、定时、延时类消息默认大小为64KB。如果超过限制则会抛出异常。
方案一消息压缩 通常我们都是传递json消息数据然后底层使用字节流进行传输。如果此时json数据超过4MB则可以考虑进行消息压缩。原理其实很好理解比如我们经常使用的压缩包可以把大文件进行压缩依次减小文件大小。那么我们这里需要使用到的就是字符压缩把json字符串进行压缩然后进行传输。 消息压缩示例 // 创建消息指定 Topic、Tag 和消息体Message msg new Message(CompressedTopic, TagA, Hello RocketMQ.getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置消息压缩级别可选值为 0~9其中 0 表示不压缩//级别越高压缩效果越好但压缩和解压缩的时间可能会更长msg.setProperty(MessageConst.PROPERTY_MESSAGE_COMPRESS_LEVEL, 4);方案二消息分割 简而言之就是把一个大消息体进行分割成多个小消息体进行传输。 一个大消息分割成多个小消息。 多个小消息拥有相同的消息标识如UUID。 分割后小消息需要有一些元数据来标识自己如 消息标识、一共分割了多少个、自己是第几个。 传输后消费者消费然后根据元数据进行数据聚合还原。 将还原后的消息走正常消费流程即可。 3.27 【如何保证幂等性】
1、全局唯一id 如果使用全局唯一ID就是根据业务的操作和内容生成一个全局ID在执行操作前先根据这个全局唯一ID是否存在来判断这个操作是否已经执行。如果不存在则把全局ID存储到存储系统中比如数据库、redis等。如果存在则表示该方法已经执行。 使用全局ID做幂等可以作为一个业务的基础的微服务存在在很多的微服务中都会用到这样的服务在每个微服务中都完成这样的功能会存在工作量重复。另外打造一个高可靠的幂等服务还需要考虑很多问题比如一台机器虽然把全局ID先写入了存储但是在写入之后挂了这就需要引入全局ID的超时机制。 使用全局唯一ID是一个通用方案可以支持插入、更新、删除业务操作。但是这个方案看起来很美但是实现起来比较麻烦下面的方案适用于特定的场景实现起来比较简单。2、去重表 这种方法适用于在业务中有唯一标的插入场景中比如在支付场景中如果一个订单只会支付一次所以订单ID可以作为唯一标识。这时我们就可以建一张去重表并且把唯一标识作为唯一索引在我们实现时把创建支付单据和写入去重表放在一个事务中如果重复创建数据库会抛出唯一约束异常操作就会回滚。3、插入或更新 这种方法插入并且有唯一索引的情况比如我们要关联商品类其中商品的ID和品类的ID可以构成唯一索引并且在数据表中也增加了唯一索引。这时就可以使用InsertOrUpdate操作。在mysql数据库中如下
insert into goods_category (goods_id,category_id,create_time,update_time)values(#{goodsId},#{categoryId},now(),now())on DUPLICATE KEY UPDATEupdate_timenow()4、多版本控制 这种方法适合在更新的场景中比如我们要更新商品的名字这时我们就可以在更新的接口中增加一个版本号来做幂等。
boolean updateGoodsName(int id,String newName,int version);在实现时可以如下
update goods set name#{newName},version#{version} where id#{id} and version${version}5、状态机控制 这种方法适合在有状态机流转的情况下比如就会订单的创建和付款订单的付款肯定是在之前这时我们可以通过在设计状态字段时使用int类型并且通过值类型的大小来做幂等比如订单的创建为0付款成功为100。付款失败为99。在设计状态字段时使用int类型并且通过值类型的大小来做幂等比如订单的创建为0付款成功为100。付款失败为99。在做状态机更新时这可以这样控制
update order set status#{status} where id#{id} and status#{status}