当前位置: 首页 > news >正文

网站底部备案字体设置仿163源码交易平台宽屏整站源码 网站模板交易平台源码

网站底部备案字体设置,仿163源码交易平台宽屏整站源码 网站模板交易平台源码,免费高清视频在线观看,网站的设计与维护摘要#作者#xff1a;猎人 文章目录 前言#xff1a;概念剖析kafka的两种位移消费位移消息的位移位移的提交自动提交手动提交 1、使用--to-earliest重置消费组消费指定topic进度2、使用--to-offset重置消费offset3、使用--to-datetime策略指定时间重置offset4、使用--to-current…#作者猎人 文章目录 前言概念剖析kafka的两种位移消费位移消息的位移位移的提交自动提交手动提交 1、使用--to-earliest重置消费组消费指定topic进度2、使用--to-offset重置消费offset3、使用--to-datetime策略指定时间重置offset4、使用--to-current 重置offset5、使用--shift-by 重置offset6、使用--by-duration 重置offset7、使用--from-file cvs文档重置offset8、删除偏移量--delete-offsets9、查询消费者成员信息--members10、查询消费者组状态信息--state11、查看topic指定分区offset的最大偏移量值最新offsets或最小值--time12、查询topic的offset的范围消费了多少条消息 前言 在Kafka Version为0.11.0.0之后Consumer的Offset信息不再默认保存在Zookeeper上而是选择用Topic的形式保存下来。在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。kafka把位移保存在一个叫做__consumer_offsets的内部主题中叫做位移主题。既然它也是主题那么离不开分区和副本这两个机制。我们并没有手动创建这个主题并且指定所以是kafka自动创建的分区的数量取决于Broker 端参数 offsets.topic.num.partitions默认是50个分区而副本参数取决于offsets.topic.replication.factor默认是3。 任何主题都会有消息会存在消息格式。肯定也会有删除策略否则消息会无限膨胀。但是位移主题的删除策略和其他主题删除策略又不太一样。我们知道普通主题的删除是可以通过配置删除时间或者大小的。而位移主题的删除叫做 Compaction。Kafka 使用Compact 策略来删除位移主题中的过期消息对于同一个 Key 的两条消息 M1 和 M2如果 M1 的发送时间早于 M2那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息剔除那些过期的消息然后把剩下的消息整理在一起。 概念剖析 设置位移偏移量可分java api方式和命令方式2种命令方式更方便简单。 kafka的两种位移 关于位移Offset其实在kafka里有两种位移 分区位移生产者向分区写入消息每条消息在分区中的位置信息由一个叫offset的数据来表征。假设一个生产者向一个空分区写入了 10 条消息那么这 10 条消息的位移依次是 0、1、…、9 消费位移消费者需要记录消费进度即消费到了哪个分区的哪个位置上这是消费者位移Consumer Offset。 注意这和上面所说的消息在分区上的位移完全不是一个概念。上面的“位移”表征的是分区内的消息位置它是不变的即一旦消息被成功写入到一个分区上它的位移值就是固定的了。而消费者位移则不同它可能是随时变化的毕竟它是消费者消费进度的指示器。 消费位移 消费位移记录的是 Consumer 要消费的下一条消息的位移切记是下一条消息的位移 而不是目前最新消费 消息的位移 假设一个分区中有 10 条消息位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息此时 Consumer 的位移是 5指向了下一条消息的位移。 至于为什么要有消费位移很好理解当 Consumer 发生故障重启之后就能够从 Kafka 中读取之前提交的位移值然后从相应的位移处继续消费从而避免整个消费过程重来一遍。就好像书签一样需要书签你才可以快速找到你上次读书的位置。 那么了解了位移是什么以及它的重要性我们自然而然会有一个疑问kafka是怎么记录、怎么保存、怎么管理位移的呢 位移的提交 Consumer 需要上报自己的位移数据这个汇报过程被称为位移提交。因为 Consumer 能够同时消费多个分区的数据所以位移的提交实际上是在分区粒度上进行的即Consumer 需要为分配给它的每个分区提交各自的位移数据。 鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响KafkaConsumer API提供了多种提交位移的方法每一种都有各自的用途这些都是本文将要谈到的方案。 位移提交分为自动提交和手动提交而手动提交又分为同步提交和异步提交。 自动提交 当消费配置enable.auto.committrue的时候代表自动提交位移。 自动提交位移是发生在什么时候呢auto.commit.interval.ms默认值是50000ms。即kafka每隔5s会帮你自动提交一次位移。自动位移提交的动作是在 poll方法的逻辑里完成的在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交如果可以那么就会提交上一次轮询的位移。假如消费数据量特别大可以设置的短一点。越简单的东西功能越不足自动提交位移省事的同时肯定会带来一些问题。自动提交带来重复消费和消息丢失的问题 重复消费 在默认情况下Consumer 每 5 秒自动提交一次位移。现在我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后所有 Consumer 从上一次提交的位移处继续消费但该位移已经是 3 秒前的位移数据了故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率但这么做只能缩小重复消费的时间窗口不可能完全消除它。这是自动提交机制的一个缺陷。 消息丢失 假设拉取了100条消息正在处理第50条消息的时候到达了自动提交窗口期自动提交线程将拉取到的每个分区的最大消息位移进行提交如果此时消费服务挂掉消息并未处理结束但却提交了最大位移下次重启就从100条那消费即发生了50-100条的消息丢失。 手动提交 当消费配置enable.auto.commitfalse的时候代表手动提交位移。用户必须在适当的时机一般是处理完业务逻辑后手动的调用相关api方法提交位移。 手动提交明显能解决消息丢失的问题因为你是处理完业务逻辑后再提交的假如此时消费服务挂掉消息并未处理结束那么重启的时候还会重新消费。 但是对于业务层面的失败导致消息未消费成功是无法处理的。因为业务层的逻辑千变万化、比如格式不正确你叫Kafka消费端程序怎么去处理应该要业务层面自己处理记录失败日志做好监控等。 另外手动提交不能解决消息重复的问题也很好理解假如消费0-100条消息50条时挂了重启后由于没有提交这一批消息的offset是会从0开始重新消费。 手动提交又分为异步提交和同步提交。这个涉及业务java代码略。 更新Offset的三个维度Topic的作用域重置策略执行方案。 Topic的作用域下面是kafka-consumer-groups.sh可携带的参数下面命令执行时必须带–group groupname –reset-offsets重置消费组的偏移量。后面可接 --to-earliest --to-latest --xxxxx 等等等-–reset-offsets-by-duration指定重置的时间从现在往前。-–reset-offsets-by-topic指定重置的topic和partition。-–reset-offsets-by-times指定重置的时间点。-–new-consumer使用新消费者API。-–topic指定要操作的topic。-–exclude-internal不列出.kafka/*的topic。–all-topics为consumer group下所有topic的所有分区调整位移。列出所有topic的所有消费组。–topic t1 --topic t2为指定的若干个topic的所有分区调整位移–topic t1:0,1,2为指定的topic分区调整位移可单个分区、多个分区调整–to-earliest把位移调整到分区当前最小位移。设置到最早位移处也就是0。分区中第一条消息。–to-latest把位移调整到分区当前最新位移。最后一个offset即主题分区HW的位置HW可理解为木桶原理的最低点。分区中最后一条消息–to-current把位移调整到分区当前位移。直接重置offset到当前的offset也就是LOE。–to-offset offset数值 把当前位移调整到指定位移处偏移量指定具体的位移位置。重置到指定的offset–shift-by N 把位移调整到当前位移 N处注意N可以是负数表示向前移动。 基于当前位移向前回退多少。–to-datetime将消费者的偏移量重置为大于或等于指定日期时间的最早偏移量。你需要提供符合yyyy-MM-ddTHH:mm:ss.xxx格式的日期时间。–to-datetime重置到指定时间的offset。把位移调整到大于给定时间的最早位移处datetime格式是yyyy-MM-ddTHH:mm:ss.xxx比如2017-08-04T00:00:00.000。DateTime 允许你指定一个时间然后将位移重置到该时间之后的最早位移处。–by-duration 将消费者的偏移量重置为当前时间减去指定时长的位置。例如P1D表示一天前PT2H表示两小时前PT3M表示3分钟前PT5S表示5秒前。–from-file 从CSV文件中读取调整策略。根据CVS文档来重置。从CSV文件中读取重置策略。CSV文件应包含消费者组、主题和分区信息以及相应的偏移量或时间戳。 执行方案 什么参数都不加只是打印出位移调整方案不具体执行 –execute执行真正的位移调整。–export把位移调整方案按照CSV格式打印方便用户成csv文件供后续直接使用。 注意事项 consumer group状态必须是inactive的即不能是处于正在工作中的状态不加执行方案默认是只做打印操作 kafka-consumer-groups.sh可携带的其他参数 command-configkafka的安全认证配置文件路径。group指定要操作的消费组。describe列出消费组的详情。delete删除消费组。dry-run仅输出要执行的操作不实际运行。 kafka-run-class.sh可携带的其他参数 运行kafka-run-class.sh脚本调用kafka.admin.TopicCommand类kafka.tools.DumpLogSegments类kafka.tools.GetOffsetShell 类等等同时接受一个操作指令参数。 该指令包括 create 创建topicalter 修改topiclist 列举topicdescribe 描述topicdelete 删除topictopictime //后接-1时表最大值含义是-1时用来请求分区最新的offset -- 每个分区最大的offset。后接-2时表最小值-2时用来请求分区最早有效的offset -- 例如有些offset 在7(设置)天删除数据, 最早的0,但是最早有效的不一定是0 。后面也可接具体的timestamp如–time 16423252638。 –broker-list //接kafka ip:9092 ,必须使用broker-list不能使用–bootstrap-server 1、使用–to-earliest重置消费组消费指定topic进度 注意设置offset前提是停止生产和消费的业务服务如果所有topic是–all-topics具体topic加 --topic 重置某topic的offset。本命令可更新到当前group最初的offset位置就是0 更新所有topic到当前最初的offset位置即最早位置就是CURRENT-OFFSET为0如果是具体topic加 --topic.再次查询消费组消费状态发现CURRENT-OFFSET列都变0了 –group参数必须携带指定消费组 ]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --reset-offsets --all-topics --to-earliest --execute案例二 更新offset位置为0后再次消费该topic全部消息然后再次查询消费组情况发现CURRENT-OFFSET恢复到正常消费位移状态了 2、使用–to-offset重置消费offset 简单查询消费组状态 [rootkafka18 ~]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.40.18:9092 --list --state GROUP STATE console-consumer-61774 Stable复杂查询消费组状态 snapshot]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --all-groups --describe --state在Kafka中消费者组的状态可以是以下几种之一只有Inactive才可以重置消费点位 Stable消费者组正在正常消费没有任何重平衡操作正在进行。 Rebalancing消费者组正在进行重平衡操作此时不能执行偏移量重置。 Joining新的消费者正在加入消费者组。 PreparingRebalance消费者组即将进行重平衡。 Syncing消费者组正在进行同步操作通常发生在重平衡之后。 需要先查询消费组等信息client id这列为‘-’才可执行重置命令即设置offset前提是停止生产、消费的业务服务。–to-offset offset新数值 把当前位移调整到指定位移处 如果是具体topic加 --topic。调具体分区加–topic topic_name:0,1,2可单分区、多分区调整 ]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --reset-offsets --all-topics --to-offset 500000 --execute往高调验证发现只有25这个执行成功了 往低调验证 如果往低调值都符合值的要求则都执行成功。 按照具体分区调整案例zgdx是topic名称0是分区号用冒号分开属于固定语法。可单个分区、多个分区调整 3、使用–to-datetime策略指定时间重置offset DateTime 允许你指定一个时间点然后将位移重置到该时间之后的最早位移处。 所有topic的offset设置到指定时刻开始如果指定时间太早发现当前位移是0当前CURRENT-OFFSET如果是具体topic加 --topic 需要注意这个–to-datetime使用UTC伦敦时间后必须接伦敦时间 命令案例 ]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --reset-offsets --all-topics --to-datetime 2024-06-14T09:30:00.000 --execute因为服务器使用纽约时间我们操作如纽约时间2025-01-6T01:35:00.000时下面偏移量为120分区的偏移量分别为688。纽约时间2025-01-6T01:37:00.000时偏移量为120分区的偏移量分别为799。 当我们使用伦敦时间2025-01-6T06:35:00.000也就是说纽约时间的06:35就是纽约时间的01:35所以设置06:35的偏移量为688 可以看到我们再次设置纽约时间01:37时偏移量确实为799 4、使用–to-current 重置offset 更新所有topic到当前offset位置。但执行后当前任何位移都不会变动 ]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --reset-offsets --all-topics --to-current --execute5、使用–shift-by 重置offset 所有topic的CURRENT-OFFSET的offset位置按设置的值进行位移向前移动10即减10。如果是具体topic加 --topic ]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --reset-offsets --all-topics --shift-by -10 --execute–shift-by接调整的位移数量接±值使用加时号可省略。-为减去CURRENT-OFFSET值。 向前移动案例 可以看出如果CURRENT-OFFSET值不够减CURRENT-OFFSET立刻归0。如果再次加位移值则再下面方框基础上全部加值 6、使用–by-duration 重置offset 将所有分区位移调整为多少时间之前的最早位可按照天、时、分、秒单位位移。例如P1D表示1天前PT2H表示两小时前PT3M表示3分钟前PT5S表示5秒前。 案例如将所有分区位移调整为3分钟之前的最早位移 ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --reset-offsets --all-topics --by-duration PT3M --execute7、使用–from-file cvs文档重置offset 通过cvs文档配置消费组”name”的”testTopic”上的所有分区的偏移量为10000 offsets.cvs格式为Topic,分区号,偏移量 testTopic,0,10000 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group name --reset-offsets --from-file offsets.cvs --execute8、删除偏移量–delete-offsets 能够执行成功的一个前提是 消费组这会是不可用状态; 偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费; 将消费组console-consumer-20733的topic上的所有分区的偏移量删除 shot]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --topic aaa --delete-offsets Request succeed for deleting offsets with topic aaa group console-consumer-20733 TOPIC PARTITION STATUS aaa 0 Successful aaa 1 Successful aaa 2 Successful 再次查询该消费组发现查不到了 [rootlocalhost local]# /usr/local/kafka_2.13-2.7.1/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --group console-consumer-20733 --describe Error: Consumer group console-consumer-20733 does not exist.操作案例二 9、查询消费者成员信息–members 查询消费者成员 snapshot]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --all-groups --describe --members Consumer group console-consumer-20733 has no active members. GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS test-consumer-group consumer-test-consumer-group-1-bc47e46d-1a55-4e85-a383-41aa345d13e4 /192.168.40.14 consumer-test-consumer-group-1 3 10、查询消费者组状态信息–state 参数解释 STATE消费者组的状态。常见的状态有 Stable所有成员都已经成功加入了该组并且正在进行正常的消费。 PreparingRebalance组正在准备进行再平衡操作此时消费者可能暂时停止消费。 CompletingRebalance再平衡过程即将完成消费者正重新分配分区。 Dead消费者组不再活跃可能是因为所有成员都已离开或崩溃。 Empty组存在但没有任何活动成员。 MEMBERS当前属于该消费者组的成员数量。 简单查询 [rootkafka18 ~]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.40.18:9092 --list --state GROUP STATE console-consumer-61774 Stable 复杂查询 snapshot]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.40.11:9092 --all-groups --describe --state Consumer group ‘console-consumer-20733’ has no active members. GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS console-consumer-20733 192.168.40.18:9092 (18) Empty 0 GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS test-consumer-group 192.168.40.11:9092 (11) range Stable 1 11、查看topic指定分区offset的最大偏移量值最新offsets或最小值–time 必须用–broker-list 参数指定任何ip输出结果一样。 time为-1时表最大偏移量值最新offsets为-2时表偏移量最小值即起始偏移量一般都为0。 –time 1589300000000表示时间戳数字为具体时间戳。 bin]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --topic aaa --time -1 --broker-list 192.168.40.11:9092 aaa:0:13 aaa:1:9 aaa:2:11]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --topic aaa --time -1 --broker-list 192.168.40.11:9092 --partitions 0 aaa:0:13经查询消费组消费情况证实查看偏移量最大offset确实是13 补充 12、查询topic的offset的范围消费了多少条消息 查询offset最小值即起始偏移量-2代表最小值 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic --time -2查询offset最大值-1代表最大值 aaa为topic名称中间列为该topic分区号有3个分区。最后一列代表该topic的该分区最新偏移量即LOG-END不是消费了多少的偏移量。从查询结果看符合上述截图内容 ]# ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic aaa --time -1 --broker-list 192.168.40.14:9092 aaa:0:11 aaa:1:9 aaa:2:10
http://www.hkea.cn/news/14504456/

相关文章:

  • 淘宝上做的网站可以优化吗宝安专业网站设计公司
  • 推荐微网站建设绍兴建设开发有限公司网站
  • 律师网站建设公司源码下载网站cms
  • 基层建设期刊上什么网站查询文章网站建设首页
  • 7黄页网站建设淄博物联网app开发公司
  • 网站快慢由什么决定网站建设 预付款
  • wordpress 地图html长沙seo
  • 公司做网站多泉州网站建设有哪些
  • 有哪些做伦敦金的网站做网站凡科如何
  • 建设网站地图素材企业为什么要交税
  • 化工厂网站建设wordpress 网络公司
  • 怀宁县住房与城乡建设局网站恒大房地产最新消息
  • 怎么样做美术招生信息网站120平米装修实用图
  • 100个万能网站广告设计网站排行榜前十名有哪些
  • 江门北京网站建设邹平网站建设公司报价
  • 知名企业网站截图做网站asp
  • 天津建站模板衡量一个网站的指标
  • 要做一个网站得怎么做嘉兴手机模板建站
  • 网站建设价格标准新闻江西今天发生的重大新闻事件
  • 吴桥县做网站建设云个人证件查询系统
  • 网站icp备案证明文件wordpress 整站模板
  • 自己建网站可以赚钱吗东莞旅游网站建设
  • 网站服务器如何搭建广东营销型网站
  • 泉州网站建设方案优化免费建电子商务网站
  • 手机便宜电商网站建设驻马店市住房和城乡建设局网站首页
  • 沧州市做网站wordpress自建邮箱
  • 赣州网站建设策划wordpress淘宝客主题下载
  • 天津企业网站制作中国室内设计公司
  • 那些做兼职的小网站seo网站诊断方案
  • 做网站赔了8万wordpress一定是主页吗