网络文化有限公司网站建设策划书,长春公司网站模板建站,app和网站的关系,湖南网站建设案例RocketMQ 存储基础回顾#xff1a; 源码分析RocketMQ之CommitLog消息存储机制
本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构#xff0c;同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储…RocketMQ 存储基础回顾 源码分析RocketMQ之CommitLog消息存储机制
本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储机制是所有的主题消息都存储在 CommitLog 文件中也就是消息发送是完全的顺序 IO 操作加上利用内存文件映射机制极大的提供的 IO 性能。消息的全量信息存放在 commitlog 文件中并且每条消息的长度是不一样的消息的具体存储格式如下 如果消费者直接基于commitlog 进行消费的话简直就是一个恶梦因为不同的主题的消息完全顺序的存储在 commitlog 文件中根据主题去查询消息不得不遍历整个 commitlog 文件显然作为一款消息中间件这是绝不允许的。RocketMQ 的ConsumeQueue 文件就是来解决消息消费的。首先我们知道一个主题在 broker 上可以分成多个消费对列默认为4个也就是消费队列是基于主题broker。那 ConsumeQueue 中当然不会再存储全量消息了而是存储为定长20字节8字节commitlog 偏移量4字节消息长度8字节tag hashcode,消息消费时首先根据 commitlog offset 去 commitlog 文件组commitlog每个文件1G填满了另外创建一个文件找到消息的起始位置然后根据消息长度读取整条消息。但问题又来了如果我们需要根据消息ID来查找消息consumequeue 中没有存储消息ID,如果不采取其他措施又得遍历 commitlog文件了为了解决这个问题rocketmq 的 index 文件又派上了用场。
接下来本文重点关注 ConsumeQueue、Index 文件是如何基于 Commitlog 构建的并且根据 ConsumeQueue、Index 文件如何查找消息。
根据 commitlog 文件生成 consumequeue、index 文件主要同运作于两种情况
1、运行中发送端发送消息到 commitlog文件此时如何及时传达到 consume文件、Index文件呢
2、broker 启动时检测 commitlog 文件与 consumequeue、index 文件中信息是否一致如果不一致需要根据 commitlog 文件重新恢复 consumequeue 文件和 index 文件。
1、commitlog、consumequeue、index 文件同步问题
RocketMQ 采用专门的线程来根据 comitlog offset 来将 commitlog 转发给ConsumeQueue、Index。其线程为DefaultMessageStore$ReputMessageService
1.1 核心属性
private volatile long reputFromOffset 0 reputFromOffset 从 commitlog 开始拉取的初始偏移量。
1.2 run方法 每处理一次 doReput 方法休眠1毫秒基本上是马不停蹄的在转发 commitlog 中的内容到 consumequeue、index。
接下来重点查看 doReput 方法。
private void doReput() {for (boolean doNext true; this.isCommitLogAvailable() doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() this.reputFromOffset DefaultMessageStore.this.getConfirmOffset()) {break;}SelectMappedBufferResult result DefaultMessageStore.this.commitLog.getData(reputFromOffset); // 1if (result ! null) {try {this.reputFromOffset result.getStartOffset();for (int readSize 0; readSize result.getSize() doNext; ) {DispatchRequest dispatchRequest DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // 2 int size dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size 0) {DefaultMessageStore.this.doDispatch(dispatchRequest); // 3 if (BrokerRole.SLAVE ! DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset size;readSize size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size 0) {this.reputFromOffset DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size 0) {log.error([BUG]read total count not equals msg total size. reputFromOffset{}, reputFromOffset);this.reputFromOffset size;} else {doNext false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() MixAll.MASTER_ID) {log.error([BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {},this.reputFromOffset);this.reputFromOffset result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext false;}}
代码1根据 offset 从 commitlog 找到一条消息如果找不到退出此次循环doReput方法跳出此处从 commitlog 文件中取出消息的逻辑在下文会重点分析故在此暂时跳过。
先浏览一下 SelectMappedBufferResult 代码2尝试构建转发请求对象 DispatchRequest 我大概浏览了一下 commitLog.checkMessageAndReturnSize主要是从Nio ByteBuffer中根据 commitlog 消息存储格式解析出消息的核心属性
// 消息主题
private final String topic;
// 消息队列
private final int queueId;
// commitlog中的偏移量
private final long commitLogOffset;
// 消息大小
private final int msgSize; // tagsCode
private final long tagsCode;
// 消息存储时间
private final long storeTimestamp;
//消息在消费队列的offset
private final long consumeQueueOffset;
// 存放在消息属性中的keys: PROPERTY_KEYS KEYS
private final String keys;
// 是否成功
private final boolean success;
// 消息唯一键 UNIQ_KEY
private final String uniqKey;
// 系统标志
private final int sysFlag;
// 事务pre消息偏移量
private final long preparedTransactionOffset;
// 属性
private final MapString, String propertiesMap; 代码3转发DistpachRequest。 根据实现类consumequeue,index 分别对应 CommitLogDispatcherBuildConsumeQueue 与 CommitlogDispatcherBuildIndex。