当前位置: 首页 > news >正文

吉林省公司注册网站做侵权视频网站

吉林省公司注册网站,做侵权视频网站,app设计制作软件,公司职务名称大全目录 一、为什么需要带有 subscribe 的 group.id二、我们需要使用commitSync手动提交偏移量吗#xff1f;三、如果我想手动提交偏移量#xff0c;该怎么做#xff1f; 一、为什么需要带有 subscribe 的 group.id 消费概念#xff1a; Kafka 使用消费者组的概念来实现主题的… 目录 一、为什么需要带有 subscribe 的 group.id二、我们需要使用commitSync手动提交偏移量吗三、如果我想手动提交偏移量该怎么做 一、为什么需要带有 subscribe 的 group.id 消费概念 Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次无论该组中实际有多少个消费者。所以 group 参数是强制性的如果没有组Kafka 将不知道如何对待订阅同一主题的其他消费者。偏移量 每当我们启动一个消费者时它都会加入一个消费者组然后根据该消费者组中的其他消费者数量为其分配要读取的分区。对于这些分区它会检查列表读取偏移量是否已知如果找到它将从这一点开始读取消息。如果没有找到偏移量则参数 auto.offset.reset 控制是从分区中最早的消息还是从最新的消息开始读取。 二、我们需要使用commitSync手动提交偏移量吗 是否需要手动提交偏移 是否需要提交偏移量取决于作为参数 enable.auto.commit 选择的值。默认情况下此设置为 true这意味着消费者将定期自动提交其偏移量由auto.commit.interval.ms 决定提交的频率。如果将其设置为 false那么将需要自己提交偏移量。这种默认行为可能也是导致很多发现 kafka 总是从最新的开始消费的原因由于偏移量是自动提交的因此它将使用该偏移量。 有没有办法从头开始重播消息 如果想每次都从头开始读取可以调用seekToBeginning如果不带参数调用它将重置为所有订阅分区中的第一条消息或者仅重置您传入的那些分区。 seekToBeginning 查找每个给定分区的第一个偏移量。poll(long) 该函数延迟计算仅在调用或时才查找所有分区中的第一个偏移量position(TopicPartition)。如果未提供分区则查找所有当前分配的分区的第一个偏移量。 public class MyListener implements ConsumerSeekAware {...Overridepublic void onPartitionsAssigned(MapTopicPartition, Long assignments, ConsumerSeekCallback callback) {callback.seekToBeginning(assignments.keySet());}}有没有办法从最后开始重播消息 有的可以使用 seekToEnd() 查找所有分配的分区到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分区到该时间戳表示的偏移量。 public class MyListener extends AbstractConsumerSeekAware {KafkaListener(...)void listn(...) {...} }public class SomeOtherBean {MyListener listener;...void someMethod() {this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);}}三、如果我想手动提交偏移量该怎么做 1、禁用自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);提交方法 对于手动提交KafkaConsumers提供了两种方法即 commitSync() 和 commitAsync()。commitSync()是一个阻塞调用在偏移量成功提交后返回commitAsync()则立即返回。如果想知道提交是否成功可以为回调处理程序 ( OffsetCommitCallback) 提供一个方法参数。请注意在两次提交调用中消费者都会提交最新poll()调用的偏移量。 举个例子假设一个分区主题有一个消费者并且最后一次调用poll()返回偏移量为 4、5、6 的消息。提交时偏移量 6 将被提交因为这是消费者客户端跟踪的最新偏移量。 同时commitSync() 和 commitAsync() 都允许更多地控制我们想要提交的偏移量如果你使用允许你指定的相应重载那么MapTopicPartition, OffsetAndMetadata消费者将仅提交指定的偏移量即映射可以包含分配的分区的任何子集 并且指定的偏移量可以为任意值。 同步提交 阻塞线程直到提交成功或遇到不可恢复的错误在这种情况下它被抛出给调用者 while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s, record.offset(), record.key(), record.value());consumer.commitSync();} }对于 for 循环中的每次迭代只有在consumer.commitSync()成功返回或因抛出异常而中断后代码才会移至下一次迭代。 异步提交 是一种非阻塞方法。调用它不会阻塞线程。相反它将继续处理以下指令无论最终是成功还是失败。 while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s, record.offset(), record.key(), record.value());consumer.commitAsync(callback);} }对于 for 循环中的每次迭代无论consumer.commitAsync()最终会发生什么代码都会移至下一次迭代。并且提交的结果将由定义的回调函数处理。 权衡延迟与数据一致性 1、如果必须确保数据一致性请选择commitSync()因为它将确保在执行任何进一步操作之前你将知道偏移量提交是成功还是失败。但由于它是同步和阻塞的你将花费更多的时间来等待提交完成这会导致高延迟。 2、如果可以接受某些数据不一致并希望具有低延迟请选择commitAsync()因为它不会等待完成。相反它只会发出提交请求并稍后处理来自 Kafka 的响应成功或失败同时代码将继续执行。
http://www.hkea.cn/news/14480715/

相关文章:

  • 购物网站开发教程 视频做服装到哪个网站拿货品质好
  • 山东省住房与建设厅网站北京建设门户网站
  • 深圳龙岗网站建设公司哪家好给视频做特效的网站
  • 开封+网站建设+网络推广红河网站建设设计
  • 静态网页建站网页设计图片剧中
  • wordpress是哪家公司的建站程序出口跨境电商有哪些平台
  • 西安做企业网站安庆seo
  • 免费下载app软件网站js特效网站模板
  • wordpress 访问权限好的seo
  • 网站建设流程咨询百度小程序怎么进入
  • 电子商务网站的建设和维护晋城做网站的公司
  • 手机网站下拉列表酒水包装设计公司
  • 门户网站流程图南京装修公司做网站
  • 网站建设的讲话要求家在临深业主论坛家在深圳
  • 黄页大全18勿看2000网站免费在线观看网址入口
  • 做商城购物网站如何开始做网站
  • 成武县建设局网站响水专业做网站的公司
  • wordpress建站位置网页脚本语言有哪些
  • 郑州七彩网站建设公司溧阳做网站价格
  • 网站后台模板怎样使用设计工作室 网站
  • 输变电壹级电力建设公司网站建设网站语言选择
  • 网站动图怎么做的建立网站线上营销
  • 网站平台建设wordpress怎么加标题
  • 做液氮冰淇淋店网站网页建站怎么设置
  • 成都分类信息网站开发长沙建设网站制作
  • 中诺建设集团有限公司网站白石桥做网站公司
  • 在郑州建设网站这么做平面作品集展示图片
  • 深圳专业做网站多少钱python3.5 做网站
  • 购物网站建设教程站酷网怎么接单赚钱
  • 重庆网站制作权威乐云践新无网站做百度推广