巩义旅游网站设计公司,设计网站源代码,广西住房和城乡建设门户网站,金戈西地那非片能延时多久目录
一#xff1a;RocketMq 整体文件存储介绍
二#xff1a;ConsumeQueue 的文件结构
三#xff1a;ConsumeQueue 写入和查询流程 一#xff1a;RocketMq 整体文件存储介绍
存储⽂件主要分为三个部分#xff1a; CommitLog#xff1a;存储消息的元数据。所有消息都会…目录
一RocketMq 整体文件存储介绍
二ConsumeQueue 的文件结构
三ConsumeQueue 写入和查询流程 一RocketMq 整体文件存储介绍
存储⽂件主要分为三个部分 CommitLog存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。 ConsumerQueue存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。 IndexFile为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。
这篇文章主要介绍ConsumeQueue的研究以rocketmq5.3.0版本作为研究。 二ConsumeQueue 的文件结构
ConsumeQueue 的文件格式 每个 ConsumeQueue 条目占用 20 字节包含以下三个字段
字段名长度字节说明CommitLog Offset8消息在 CommitLog 文件中的物理偏移量。Message Length4消息的长度。Tag HashCode8消息 Tag 的哈希值用于快速查找具有相同 Tag 的消息。
这种固定长度的设计使得 ConsumeQueue 文件可以像数组一样随机访问极大地提高了读取性能 三ConsumeQueue 写入和查询流程
1. ConsumeQueue 写入流程图
---------------------
| 消息写入 CommitLog |
---------------------|v
---------------------
| 触发 Reput 操作 |
| - ReputMessageService |
---------------------|v
---------------------
| 获取 ConsumeQueue |
| - 根据 Topic 和 QueueId |
---------------------|v
---------------------
| 构建索引条目 |
| - CommitLog Offset |
| - Message Length |
| - Tag HashCode |
---------------------|v
---------------------
| 写入 ConsumeQueue 文件 |
| - 每个条目 20 字节 |
| - 文件大小 30 万条目 |
---------------------|v
---------------------
| 刷盘操作 |
| - 定期刷盘到磁盘 |
---------------------
写入流程 消息写入 CommitLog Broker 接收到消息后将其顺序写入 CommitLog 文件 触发 Reput 操作 Broker 中的 ReputMessageService 线程异步将 CommitLog 中的消息重新构建到 ConsumeQueue 文件 获取 ConsumeQueue 根据消息的 Topic 和 QueueId从 consumeQueueTable 中获取对应的 ConsumeQueue。如果不存在则创建一个新的 ConsumeQueue 构建索引条目 为每条消息构建一个索引条目包含以下信息 CommitLog Offset消息在 CommitLog 中的物理偏移量。 Message Length消息的长度。 Tag HashCode消息 Tag 的哈希值 写入 ConsumeQueue 文件 将索引条目写入 ConsumeQueue 文件。每个条目占用 20 字节文件大小固定为 30 万个条目 刷盘操作 定期将 ConsumeQueue 文件中的数据刷盘确保数据持久化
consumequeue写入代码org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo 2. ConsumeQueue 查询流程图
---------------------
| 消费者拉取消息 |
| - Topic, QueueId |
| - 起始逻辑偏移量 |
---------------------|v
---------------------
| 查询 ConsumeQueue |
| - 根据逻辑偏移量计算 |
| ConsumeQueue 文件位置 |
---------------------|v
---------------------
| 读取索引条目 |
| - 获取 CommitLog Offset |
| - 获取 Message Length |
| - 获取 Tag HashCode |
---------------------|v
---------------------
| 定位 CommitLog 文件 |
| - 根据物理偏移量计算 |
| CommitLog 文件位置 |
---------------------|v
---------------------
| 从 CommitLog 读取消息 |
| - 根据物理偏移量读取 |
| - 校验消息完整性 |
---------------------|v
---------------------
| 返回消息给消费者 |
---------------------
查询流程 消费者拉取消息 消费者指定 Topic、QueueId 和起始逻辑偏移量向 Broker 发起拉取消息请求 查询 ConsumeQueue Broker 根据逻辑偏移量计算 ConsumeQueue 文件的位置读取对应的索引条目 读取索引条目 从 ConsumeQueue 文件中读取索引条目获取消息在 CommitLog 中的物理偏移量、消息长度和 Tag 哈希值 定位 CommitLog 文件 根据物理偏移量计算 CommitLog 文件的位置读取对应的消息内容 从 CommitLog 读取消息 从 CommitLog 文件中读取消息内容并校验消息的完整性 返回消息给消费者 Broker 将读取到的消息内容返回给消费者
查询代码入口: org.apache.rocketmq.store.DefaultMessageStore#getMessage(java.lang.String, java.lang.String, int, long, int, int, org.apache.rocketmq.store.MessageFilter)