当前位置: 首页 > news >正文

做珠宝首饰网站深圳网站定制公司

做珠宝首饰网站,深圳网站定制公司,企业网站建设费用定金怎么做账,四川省建设厅官方网站联系电话概述 Kafka源码包含多个模块#xff0c;每个模块负责不同的功能。以下是一些核心模块及其功能的概述#xff1a; 服务端源码 #xff1a;实现Kafka Broker的核心功能#xff0c;包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络…概述 Kafka源码包含多个模块每个模块负责不同的功能。以下是一些核心模块及其功能的概述 服务端源码 实现Kafka Broker的核心功能包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。 Java客户端源码 实现了Producer和Consumer与Broker的交互机制以及通用组件支撑代码。 Connect源码 用来构建异构数据双向流式同步服务。 Stream源码 用来实现实时流处理相关功能。 Raft源码 实现了Raft一致性协议。 Admin模块 Kafka的管理员模块操作和管理其topicpartition相关包含创建删除topic或者拓展分区等。 Api模块 负责数据交互客户端与服务端交互数据的编码与解码。 Client模块 包含Producer读取Kafka Broker元数据信息的类如topic和分区以及leader。 Cluster模块 包含Broker、Cluster、Partition、Replica等实体类。 Common模块 包含各种异常类以及错误验证。 Consumer模块 消费者处理模块负责客户端消费者数据和逻辑处理。 Controller模块 负责中央控制器的选举分区的Leader选举Replica的分配或重新分配分区和副本的扩容等。 Coordinator模块 负责管理部分consumer group和他们的offset。 Javaapi模块 提供Java语言的Producer和Consumer的API接口。 Log模块 负责Kafka文件存储读写所有Topic消息数据。 Message模块 封装多条数据组成数据集或压缩数据集。 Metrics模块 负责内部状态监控。 Network模块 处理客户端连接网络事件模块。 Producer模块 生产者细节实现包括同步和异步消息发送。 Security模块 负责Kafka的安全验证和管理。 Serializer模块 序列化和反序列化消息内容。 Server模块 涉及Leader和Offset的checkpoint动态配置延时创建和删除TopicLeader选举Admin和Replica管理等。 Tools模块 包含多种工具如导出consumer offset值LogSegments信息Topic的log位置信息Zookeeper上的offset值等。 Utils模块 包含各种工具类如JsonZkUtils线程池工具类KafkaScheduler公共调度器类等。 这些模块共同构成了Kafka的整体架构使其能够提供高吞吐量、高可用性的消息队列服务。 kafka源码分支为1.0.2 分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。 kafkaController初始化时会启动replicaStateMachine和partitionStateMachine //在 KafkaController 中//有两个状态机分区状态机和副本状态机//一个管理器Channel 管理器负责管理所有的 Broker 通信//相关缓存Partition 信息、Topic 信息、broker id 信息等//四种 leader 选举机制分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发//启动副本状态机初始化所有 Replica 的状态信息如果 Replica 所在节点是 alive 的那么状态更新为 OnlineReplica, 否则更新为 ReplicaDeletionIneligiblereplicaStateMachine.startup()//启动分区状态机初始化所有 Partition 的状态信息如果 leader 所在 broker 是 alive 的那么状态更新为 OnlinePartition否则更新为 OfflinePartitionpartitionStateMachine.startup()ReplicaStateMachine类相关方法 /*** Invoked on successful controller election. First registers a broker change listener since that triggers all* state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.* Then triggers the OnlineReplica state change for all replicas.*/def startup() {// //初始化所有副本的状态信息initializeReplicaState()//将online的replica状态转变为OnlineReplicahandleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)info(Started replica state machine with initial state - replicaState.toString())}/*** Invoked on startup of the replicas state machine to set the initial state for replicas of all existing partitions* in zookeeper*///初始化所有副本的状态信息// 这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState 中并没有真正进行状态转移的操作。private def initializeReplicaState() {for((topicPartition, assignedReplicas) - controllerContext.partitionReplicaAssignment) {val topic topicPartition.topicval partition topicPartition.partitionassignedReplicas.foreach { replicaId val partitionAndReplica PartitionAndReplica(topic, partition, replicaId)//如果 Replica 所在机器是 alive 的那么将其状态设置为 OnlineReplica//replicaId即brokerIdif (controllerContext.isReplicaOnline(replicaId, topicPartition))replicaState.put(partitionAndReplica, OnlineReplica)else {// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.// This is required during controller failover since during controller failover a broker can go down,// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.//否则设置为 ReplicaDeletionIneligible 状态replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)}}}}/*** This API is invoked by the broker change controller callbacks and the startup API of the state machine* param replicas The list of replicas (brokers) that need to be transitioned to the target state* param targetState The state that the replicas should be moved to* The controllers allLeaders cache should have been updated before this*///用于处理 Replica 状态的变化def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,callbacks: Callbacks (new CallbackBuilder).build) {if (replicas.nonEmpty) {info(Invoking state change to %s for replicas %s.format(targetState, replicas.mkString(,)))try {brokerRequestBatch.newBatch()//状态转变replicas.foreach(r handleStateChange(r, targetState, callbacks))//向 broker 发送相应请求brokerRequestBatch.sendRequestsToBrokers(controller.epoch)} catch {case e: Throwable error(Error while moving some replicas to %s state.format(targetState), e)}}}/*** This API exercises the replicas state machine. It ensures that every state transition happens from a legal* previous state to the target state. Valid state transitions are:* NonExistentReplica -- NewReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the* partition to every live broker** NewReplica - OnlineReplica* --add the new replica to the assigned replica list if needed** OnlineReplica,OfflineReplica - OnlineReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the* partition to every live broker** NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible - OfflineReplica* --send StopReplicaRequest to the replica (w/o deletion)* --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and* UpdateMetadata request for the partition to every live broker.** OfflineReplica - ReplicaDeletionStarted* --send StopReplicaRequest to the replica (with deletion)** ReplicaDeletionStarted - ReplicaDeletionSuccessful* -- mark the state of the replica in the state machine** ReplicaDeletionStarted - ReplicaDeletionIneligible* -- mark the state of the replica in the state machine** ReplicaDeletionSuccessful - NonExistentReplica* -- remove the replica from the in memory partition replica assignment cache* param partitionAndReplica The replica for which the state transition is invoked* param targetState The end state that the replica should be moved to*/def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,callbacks: Callbacks) {val topic partitionAndReplica.topicval partition partitionAndReplica.partitionval replicaId partitionAndReplica.replicaval topicAndPartition TopicAndPartition(topic, partition)// Replica 不存在的话,状态初始化为 NonExistentReplicaval currState replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)val stateChangeLog stateChangeLogger.withControllerEpoch(controller.epoch)try {def logStateChange(): Unit stateChangeLog.trace(sChanged state of replica $replicaId for partition $topicAndPartition from s$currState to $targetState)val replicaAssignment controllerContext.partitionReplicaAssignment(topicAndPartition)//校验状态转变是否符合要求assertValidTransition(partitionAndReplica, targetState)targetState match {case NewReplica 其前置状态只能为 NonExistentReplica// start replica as a follower to the current leader for its partition//从 zk 获取 Partition 的 leaderAndIsr 信息val leaderIsrAndControllerEpochOpt ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)leaderIsrAndControllerEpochOpt match {case Some(leaderIsrAndControllerEpoch) //若是leader的replica状态不能变为NewReplicaif(leaderIsrAndControllerEpoch.leaderAndIsr.leader replicaId)throw new StateChangeFailedException(sReplica $replicaId for partition $topicAndPartition cannot sbe moved to NewReplica state as it is being requested to become leader)//向该 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),topic, partition, leaderIsrAndControllerEpoch,replicaAssignment, isNew true)//对于新建的 Partition处于这个状态时该 Partition 是没有相应的 LeaderAndIsr 信息的case None // new leader request will be sent to this replica when one gets elected}//将该 Replica 的状态转移成 NewReplica然后结束流程。replicaState.put(partitionAndReplica, NewReplica)logStateChange()case ReplicaDeletionStarted //其前置状态只能为 OfflineReplica//更新向该 Replica 的状态为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionStarted)// send stop replica command//发送 StopReplica 请求给该副本,并设置 deletePartitiontrue//broker收到这请求后会从物理存储上删除这个 Replica 的数据内容brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition true,callbacks.stopReplicaResponseCallback)logStateChange()case ReplicaDeletionIneligible //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionIneligible)logStateChange()case ReplicaDeletionSuccessful //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)logStateChange()case NonExistentReplica //其前置状态只能为 ReplicaDeletionSuccessful。// NonExistentReplica 是副本完全删除、不存在这个副本的状态// remove this replica from the assigned replicas list for its partition//在 controller 的 partitionReplicaAssignment 删除这个 Partition 对应的 replica 信息val currentAssignedReplicas controllerContext.partitionReplicaAssignment(topicAndPartition)controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ replicaId))//将这个 Topic 从缓存中删除。replicaState.remove(partitionAndReplica)logStateChange()case OnlineReplica //其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible//副本正常工作时的状态此时的 Replica 既可以作为 leader 也可以作为 followerreplicaState(partitionAndReplica) match {case NewReplica //其前置状态如果为 NewReplica// add this replica to the assigned replicas list for its partition//从 Controller 的 partitionReplicaAssignment 中获取这个 Partition 的 ARval currentAssignedReplicas controllerContext.partitionReplicaAssignment(topicAndPartition)//如果 Replica 不在 AR 中的话那么就将其添加到 Partition 的 AR 中if(!currentAssignedReplicas.contains(replicaId))controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas : replicaId)logStateChange()case _ //其前置状态如果为OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// check if the leader for this partition ever existed//如果该 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的状态,并发送相应的请求//否则不做任何处理controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(leaderIsrAndControllerEpoch) brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,replicaAssignment)replicaState.put(partitionAndReplica, OnlineReplica)logStateChange()case None // that means the partition was never in OnlinePartition state, this means the broker never// started a log for that partition and does not have a high watermark value for this partition}}//最后将 Replica 的状态设置为 OnlineReplica 状态。replicaState.put(partitionAndReplica, OnlineReplica)case OfflineReplica //其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// send stop replica command to the replica so that it stops fetching from the leader//发送 StopReplica 请求给该副本,先停止副本同步 deletePartition falsebrokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition false)// As an optimization, the controller removes dead replicas from the ISRval leaderAndIsrIsEmpty: Boolean controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(_) //将该 replica 从 Partition 的 isr 移除这个 replica前提 isr 中还有其他有效副本controller.removeReplicaFromIsr(topic, partition, replicaId) match {case Some(updatedLeaderIsrAndControllerEpoch) // send the shrunk ISR state change request to all the remaining alive replicas of the partition.val currentAssignedReplicas controllerContext.partitionReplicaAssignment(topicAndPartition)if (!controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition)) {// 发送 LeaderAndIsr 请求给剩余的其他副本,因为 ISR 变动了brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ replicaId),topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)}//更新这个 Replica 的状态为 OfflineReplicareplicaState.put(partitionAndReplica, OfflineReplica)logStateChange()falsecase None true}case None true}if (leaderAndIsrIsEmpty !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))throw new StateChangeFailedException(sFailed to change state of replica $replicaId for partition $topicAndPartition since the leader sand isr path in zookeeper is empty)}}catch {case t: Throwable stateChangeLog.error(sInitiated state change of replica $replicaId for partition $topicAndPartition from s$currState to $targetState failed, t)}}上面 Replica 各种转移的触发的条件
http://www.hkea.cn/news/14384551/

相关文章:

  • 郑州做音响网站的公司把网站扒下来以后怎么做
  • 怎么让网站页面自适应利用淘宝联盟做网站
  • 网站的类型和特色虚拟展馆官方网站建设
  • 品牌网站开发设计wordpress后台默认登录地址
  • 网站架构图一般包括什么做网站在经营范围内属于什么
  • 做微商货源网站赚钱吗wordpress 内容管理系统
  • 手机个人网站制作教程秦皇岛在建工程项目
  • 国外网站 模板画册设计印刷
  • 接做网站需要问什么条件wordpress 联系表单
  • 增长超人网站建设价格番禺网站开发公司电话
  • 网站建设及推广枣强台州网站建设制作
  • 食品网站app建设软件项目过程
  • 中国旅游电子商务网站建设情况怎样将自己做的网页加入网站
  • 可以浏览国外网站展馆
  • 建网站需多少钱南通网站排名
  • 商务网站欣赏提供建立网站服务的公司
  • 双鸭山市建设局网站花生壳域名还免费吗
  • 可视化网站设计工具wordpress的登陆地址修改密码
  • 广东住房城乡建设厅网站微信公众平台小程序二维码怎么生成
  • 网站建设三方合同范本法与家国建设征文网站
  • 建设网站大全网站设计导航
  • 网站代运营价格建筑库
  • 建立网站要钱吗?网站建设费放什么科目
  • 怎么制作婚介网站网站没有问题但是一直做不上首页
  • 怎么查看一个网站的浏览量小企业网站建设论文
  • 世界之窗附近做网站公司电子商务网站推广策略
  • 企业网站模板 免费个人网站html模板下载
  • 提供网站建设教学视频福州优化搜索引擎
  • 广西建设部投诉网站襄阳住房和城乡建设局网站首页
  • 东营网站建设电话wordpress的网址