当前位置: 首页 > news >正文

网站描述和关键词怎么写安徽建设厅网站网址

网站描述和关键词怎么写,安徽建设厅网站网址,wordpress的统计,合肥市建设局网站在Kafka数据写入流程中#xff0c;Broker端负责接收客户端发送的消息#xff0c;并将其持久化存储#xff0c;是整个流程的关键环节。本文将深入Kafka Broker的源码#xff0c;详细解析消息接收、处理和存储的具体实现。 一、网络请求接收与解析 Broker通过Processor线程…在Kafka数据写入流程中Broker端负责接收客户端发送的消息并将其持久化存储是整个流程的关键环节。本文将深入Kafka Broker的源码详细解析消息接收、处理和存储的具体实现。 一、网络请求接收与解析 Broker通过Processor线程池接收来自客户端的网络请求Processor线程基于Java NIO的Selector实现非阻塞I/O负责监听网络连接和读取数据。其核心处理逻辑如下 public class Processor implements Runnable {private final Selector selector;private final KafkaApis kafkaApis;public Processor(Selector selector, KafkaApis kafkaApis) {this.selector selector;this.kafkaApis kafkaApis;}Overridepublic void run() {while (!stopped) {try {// 轮询获取就绪的网络事件selector.poll(POLL_TIMEOUT);SetSelectionKey keys selector.selectedKeys();for (SelectionKey key : keys) {if (key.isReadable()) {// 读取网络数据NetworkReceive receive selector.read(key);if (receive ! null) {// 处理接收到的请求kafkaApis.handle(receive);}}}} catch (Exception e) {log.error(Processor failed to process requests, e);}}} }当Selector检测到有可读事件时会从对应的SocketChannel中读取数据并封装成NetworkReceive对象然后传递给KafkaApis进行进一步处理。 KafkaApis是Broker处理请求的核心组件它根据请求类型调用相应的处理器 public class KafkaApis {private final MapApiKeys, RequestHandler requestHandlers;public KafkaApis(MapApiKeys, RequestHandler requestHandlers) {this.requestHandlers requestHandlers;}public void handle(NetworkReceive receive) {try {// 解析请求头RequestHeader header RequestHeader.parse(receive.payload());ApiKeys apiKey ApiKeys.forId(header.apiKey());// 获取对应的请求处理器RequestHandler handler requestHandlers.get(apiKey);if (handler ! null) {// 处理请求handler.handle(receive);} else {// 处理未知请求类型handleUnknownRequest(header, receive);}} catch (Exception e) {// 处理请求解析和处理过程中的异常handleException(receive, e);}} }对于生产者发送的消息写入请求ApiKeys.PRODUCE会由ProduceRequestHandler进行处理。 二、消息写入处理与验证 ProduceRequestHandler负责处理生产者发送的消息写入请求其核心职责包括验证请求合法性、将消息写入对应分区日志以及生成响应。关键处理逻辑如下 public class ProduceRequestHandler implements RequestHandler {private final LogManager logManager;private final ReplicaManager replicaManager;public ProduceRequestHandler(LogManager logManager, ReplicaManager replicaManager) {this.logManager logManager;this.replicaManager replicaManager;}Overridepublic void handle(NetworkReceive receive) {try {// 解析ProduceRequestProduceRequest request ProduceRequest.parse(receive.payload());// 验证请求版本和元数据validateRequest(request);// 处理每个分区的消息MapTopicPartition, PartitionData partitionDataMap new HashMap();for (Map.EntryTopicPartition, MemoryRecords entry : request.data().entrySet()) {TopicPartition tp entry.getKey();MemoryRecords records entry.getValue();// 获取分区日志Log log logManager.getLog(tp);if (log ! null) {// 将消息追加到日志LogAppendInfo appendInfo log.append(records);// 记录分区数据信息partitionDataMap.put(tp, new PartitionData(appendInfo.offset(), appendInfo.logAppendTime()));} else {// 处理分区不存在的情况partitionDataMap.put(tp, new PartitionData(RecordBatch.NO_OFFSET, -1L));}}// 构建响应ProduceResponse response new ProduceResponse(request.version(), request.correlationId(), partitionDataMap);// 发送响应sendResponse(response, receive);} catch (Exception e) {// 处理请求处理过程中的异常handleException(receive, e);}} }在上述代码中validateRequest方法会对请求的版本、主题和分区的合法性进行检查log.append方法将消息追加到对应分区的日志文件中最后根据处理结果构建ProduceResponse响应并发送回给生产者。 三、消息持久化存储 Kafka使用日志Log来持久化存储消息每个分区对应一个日志实例。Log类负责管理日志文件、分段以及消息的读写操作其核心的消息追加方法如下 public class Log {private final LogSegmentManager segmentManager;// 省略其他成员变量public LogAppendInfo append(MemoryRecords records) throws IOException {try {// 获取当前活跃的日志分段LogSegment segment segmentManager.activeSegment();long offset segment.sizeInBytes();long baseOffset segment.baseOffset();// 将消息追加到日志分段long appended segment.append(records);// 更新日志元数据updateHighWatermark(segment);// 返回追加信息return new LogAppendInfo(baseOffset offset, time.milliseconds());} catch (Exception e) {// 处理写入异常handleWriteException(e);throw e;}} }LogSegment类表示一个日志分段它包含了日志文件、索引文件等具体的消息写入操作在LogSegment的append方法中完成 public class LogSegment {private final FileMessageSet fileMessageSet;// 省略其他成员变量public long append(MemoryRecords records) throws IOException {// 计算写入位置long position fileMessageSet.sizeInBytes();// 将消息写入文件long written fileMessageSet.append(records);// 更新索引updateIndex(records.sizeInBytes(), position);return written;} }FileMessageSet类负责实际的文件I/O操作它利用Java NIO的FileChannel实现高效的磁盘写入并且支持零拷贝技术进一步提升写入性能 public class FileMessageSet {private final FileChannel fileChannel;// 省略其他成员变量public long append(MemoryRecords records) throws IOException {try (FileLock lock fileChannel.lock()) {// 使用零拷贝技术写入数据long written fileChannel.transferFrom(new ReadOnlyByteBufferChannel(records.buffer()), sizeInBytes(), records.sizeInBytes());sizeInBytes written;return written;}} }通过上述一系列操作Kafka将接收到的消息高效、可靠地持久化存储到磁盘中保证了数据的安全性和一致性。 通过对Kafka Broker端数据写入流程的源码剖析我们全面了解了从网络请求接收到消息持久化存储的完整过程。各组件通过严谨的设计和高效的实现确保了Kafka在高并发场景下能够稳定、快速地处理大量消息写入请求为整个消息系统的可靠运行提供了坚实保障。
http://www.hkea.cn/news/14293864/

相关文章:

  • html手机网站开发教程电商网站平台建设资金预算
  • 成都户外网站建设招聘网站哪个好用
  • 网站开发资金预算wordpress文章数据包
  • wordpress 企业站模板个人网站建设模板
  • 制作网站的步骤有哪些百度基木鱼建站
  • 杭州高端网站制作工控人如何做自己的网站
  • 济南优化推广网站seo网页制作公司的小客户有哪些
  • 购物网站源代码官网搭建
  • 南昌市会做网站有哪几家phpcms 后台修改修改网站备案号
  • 企业做网站的目的怎么样提高网站排名
  • 做网站用的书免费网站代码大全
  • 做网站后台怎么弄最新网页制作资料
  • 黄山公司做网站搜索关键词的软件
  • 宁夏建设工程招标投标信息网站网上商城电商项目
  • 苏州 网站制作公司做网站优化的弊端
  • 艺客网站首页长沙专业做网站公司有哪些
  • 闲鱼网站做交易是先付款吗班级优化大师app下载学生版
  • 汽车销售网站模板 cms公司网站维护建设费入什么科目
  • 百度描述 网站互联网大厂有哪些
  • 哪些网站可以做外贸网页传奇手游排行榜前十名
  • 手机网站demo网站建设要程序员吗
  • idc销售网站源码建筑论坛
  • 中国网站制作企业排行榜企业门户网站模板
  • 做circrna的网站怎么做网站的rss
  • 不让网站开发公司进入后台seo技术顾问
  • 论坛网站策划福州网站关键词
  • 大学生一个人做网站杭州手机网站制作电脑公司
  • 怎么做qq网站宛城区建网站
  • 重庆搜索引擎优化佛山网站优化效果
  • 网站优化标准wordpress $wp