当前位置: 首页 > news >正文

招标代理网站建设深圳市做网站公司

招标代理网站建设,深圳市做网站公司,大型的建设工程类考试辅导网站,说明电子商务网站的建设流程1. 偏移量的概念 消费者在消费数据的时候需要将消费的记录存储到一个位置#xff0c;防止因为消费者程序宕机而引起断点消费数据丢失问题#xff0c;下一次可以按照相应的位置从kafka中找寻数据#xff0c;这个消费位置记录称之为偏移量offset。 kafka0.9以前版本将偏移量信…1. 偏移量的概念 消费者在消费数据的时候需要将消费的记录存储到一个位置防止因为消费者程序宕机而引起断点消费数据丢失问题下一次可以按照相应的位置从kafka中找寻数据这个消费位置记录称之为偏移量offset。 kafka0.9以前版本将偏移量信息记录到zookeeper中 新版本中偏移量信息记录在__consumer_offsets中这个topic是系统生成的不仅仅帮助管理偏移量信息还能分配consumer给哪个coordinator管理是一个非常重要的topic 它的记录方式和我们知道的记录方式一样 groupid topic partition offset 其中存储到__consumer_offsets中的数据格式也是按照k-v进行存储的其中k是groupid topic partition value值为offset的偏移量信息。 [hexuanhadoop106 ~]$ kafka-topics.sh --bootstrap-server hadoop106:9092 --list __consumer_offsets topic_a topic_b topic_c topic_e topic_f topic_g可以看到系统生成的topic 因为之前我们消费过很多数据现在可以查看一下记录在这个topic中的偏移量信息 其中存在一个kafka-consumer-groups.sh 命令 # 查看消费者组信息 kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --list # 查询具体信息 kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --describe --group my-group # 查看活跃信息 kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --describe --group my-group --members 查看消费者组信息 [hexuanhadoop106 ~]$ kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --list hainiu_group hainiu_group2当前使用组信息 [hexuanhadoop106 ~]$ kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --describe --group hainiu_groupGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID hainiu_group topic_c 0 0 0 0 consumer-hainiu_group-1-41a9ebd6-99a3-4d83-b1d7-88a2a9295054 /192.168.154.1 consumer-hainiu_group-1 hainiu_group topic_b 1 1438 1438 0 - - - hainiu_group topic_b 0 1440 1440 0 - - - hainiu_group topic_b 3 1417 1417 0 - - - hainiu_group topic_b 4 1473 1473 0 - - - hainiu_group topic_b 5 1440 1440 0 - - - hainiu_group topic_b 2 1407 1407 0 - - - hainiu_group topic_b 6 1391 1391 0 - 当前组消费偏移量信息 GROUP组名 TOPICtopic信息 PARTITION分区 CURRENT-OFFSET当前消费偏移量 LOG-END-OFFSET这个分区总共存在多少数据 LAG还差多少没消费 CONSUMER-ID随机消费者id HOST主机名 CLIENT-ID客户端id 同时我们也可以查询__consumer_offset中的原生数据 kafka-console-consumer.sh --bootstrap-server hadoop106:9092 \ --topic __consumer_offsets --from-beginning --formatter \ kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter 使用元数据格式化方式查看偏移量信息数据 key展示的是groupid,topic,partition ,  value值展示的是当前的偏移量信息 并且在这个topic中是追加形式一致往里面写入的 2. 偏移量的自动管理 那么我们已经看到了偏移量的存储但是偏移量究竟是怎么提交的呢 首先我们没有设置任何的偏移量提交的代码这个是默认开启的其中存在两个参数 pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //开启自动提交偏移量信息 pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); //默认提交间隔5s 官网的设置参数为两个true和5000。 所以我们在没有开启默认提交的时候已经自动提交了 为了演示自动提交的效果我们引入一个参数 auto.offset.reset 这个参数用于控制没有偏移量存储的时候应该从什么位置进行消费数据 因为偏移量自动提交默认是5秒一次如果数据在5秒内消费完毕则会造成偏移量并没有存储的情况 其中参数值官网中给出三个 [latest, earliest, none]latest:从最新位置消费earliest最早位置消费数据none如果不指定消费的偏移量直接报错 一定要记得一点如果有偏移量信息那么以上的设置是无效的. 官方文档显示给出的该参数的默认值为lastest即从最新位置开始消费。 现在我们设置读取位置为最早位置并且消费数据看看可不可以记录偏移量断点续传 思路 首先修改组id为一个新的组然后从最早位置消费数据如果记录了偏移量那么重新启动消费者会看到没有任何数据因为之前记录了消费数据的位置 整体代码如下 package com.hainiu.kafka;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.*;public class Consumer1 {public static void main(String[] args) {Properties pro new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,nn1:9092);pro.put(ConsumerConfig.GROUP_ID_CONFIG,new_group);pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);KafkaConsumerString, String consumer new KafkaConsumerString, String(pro);ListString topics Arrays.asList(topic_d,topic_e);consumer.subscribe(topics);while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String it records.iterator();while(it.hasNext()){ConsumerRecordString, String record it.next();System.out.println(record.topic()-record.partition()- record.offset()-record.key()-record.value());}}} } 运行完毕打印数据 这个时候我们需要在5s之内关闭应用然后重新启动因为提交的间隔时间是5s 再次启动 我们发现数据依旧被消费出来了证明之前的偏移量存储没有任何效果和作用因为间隔时间是5s 现在我们等待5s后在关闭应用 发现没有任何数据产生因为偏移量已经提交了 3. 偏移量的手动提交 如上的案例我们发现偏移量的管理如果交给系统自己管理我们没有办法及时的修改和管理偏移量信息这个时候我们需要手动来提交给管理偏移量更加及时和方便 这个时候引入两个方法 consumer.commitAsync(); consumer.commitSync(); commitAsync 异步提交方式只提交一次不管成功与否不会重试 commitSync 同步提交方式同步提交方式会一直提交到成功为止 一般我们都会选择异步提交方式他们的功能都是将拉取到的一整批数据的最大偏移量直接提交到__consumer_offsets中但是同步方式会很浪费资源异步方式虽然不能保证稳定性但是我们的偏移量是一直递增存储的所以偶尔提交不成功一个两个不影响我们的使用 pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //设定自动提交为false consumer.commitSync(); consumer.commitAsync(); //设定提交方式为手动提交 整体代码如下 package com.hainiu.kafka.consumer;/*** ClassName : consumer_offsets* Package : com.hainiu.kafka.consumer* Description** Author HeXua* Create 2024/11/5 21:30* Version 1.0*/import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.*;public class Consumer_CommitSync {public static void main(String[] args) {Properties pro new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop106:9092);pro.put(ConsumerConfig.GROUP_ID_CONFIG,hainiu_group2);pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);KafkaConsumerString, String consumer new KafkaConsumerString, String(pro);ListString topics Arrays.asList(topic_h);consumer.subscribe(topics);while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String it records.iterator();while(it.hasNext()){ConsumerRecordString, String record it.next();System.out.println(record.topic()-record.partition()- record.offset()-record.key()-record.value());}consumer.commitAsync(); // consumer.commitSync();}} } 现在先在topic中输入部分数据 然后启动消费者当存在数据打印的时候马上关闭掉应用在此启动会发现数据不会重新消费 topic_h-5-12-null-1 topic_h-5-13-null-2 topic_h-5-14-null-3 topic_h-5-15-null-4 topic_h-5-16-null-5 topic_h-5-17-null-6 偏移量已经提交不会重复消费数据 4. 断点消费数据 在没有偏移量的时候我们可以设定 auto.offset.reset进行数据的消费 可选参数有 latest earliest none等位置 但是如果存在偏移量以上的设定就不在好用了我们需要根据偏移量的位置进行断点消费数据 但是有的时候我们需要指定位置消费相应的数据 这个时候我们需要使用到 consumer.seek(); //可以指定位置进行数据的检索 但是我们不能随意的指定消费者消费数据的位置因为在启动消费者的时候一个组中会存在多个消费者每个人拿到的对应分区是不同的所以我们需要知道这个消费者能够获取的分区是哪个然后再指定相应的断点位置 这里我们就需要监控分区的方法展示出来所有订阅的分区信息 consumer.subscribe(topics, new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) {}Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) {}}); 为了演示效果我们使用生产者在topic_d中增加多个消息 package com.hainiu.kafka.consumer;/*** ClassName : Producer2* Package : com.hainiu.kafka.consumer* Description** Author HeXua* Create 2024/11/5 23:01* Version 1.0*/ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer2 {public static void main(String[] args) {Properties pro new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop106:9092);pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducerString, String producer new KafkaProducerString, String(pro);for (int i 0; i 1000; i) {ProducerRecordString, String record new ProducerRecordString, String(topic_d, i, messagei);producer.send(record);}producer.close();} } 随机发送数据到不同的节点使用随机key 然后使用断点消费数据 不设置任何的偏移量提交操作和断点位置 package com.hainiu.kafka.consumer;/*** ClassName : ConsumerWithUDOffset* Package : com.hainiu.kafka.consumer* Description** Author HeXua* Create 2024/11/5 23:03* Version 1.0*/ import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.*;public class ConsumerWithUDOffset {public static void main(String[] args) {Properties pro new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop106:9092);pro.put(ConsumerConfig.GROUP_ID_CONFIG,new1);pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,earliest);pro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(pro);ListString topics Arrays.asList(topic_h);// range roundRobin sticky cooperativeStickyconsumer.subscribe(topics, new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(CollectionTopicPartition collection) {}Overridepublic void onPartitionsAssigned(CollectionTopicPartition collection) {for (TopicPartition topicPartition : collection) {consumer.seek(topicPartition,195);}}});while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String it records.iterator();while(it.hasNext()){ConsumerRecordString, String record it.next();System.out.println(record.topic()-record.partition()- record.offset()-record.key()-record.value());}consumer.commitAsync();}} }5. 时间断点 kafka没有给大家提供直接根据时间找到断点位置的方法我们需要根据时间找到偏移量然后根据偏移量进行数据消费 consumer.offsetsForTimes(); //通过这个方法找到对应时间的偏移量位置 consumer.seek(); //然后在通过这个方法根据断点进行消费数据 整体代码如下 package com.hainiu.kafka;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.*;public class Consumer1 {public static void main(String[] args) {Properties pro new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,nn1:9092);pro.put(ConsumerConfig.GROUP_ID_CONFIG,new_group221);pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String consumer new KafkaConsumerString, String(pro);ListString topics Arrays.asList(topic_e);consumer.subscribe(topics, new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) {// no op}Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) {HashMapTopicPartition, Long map new HashMap();for (TopicPartition partition : partitions) {map.put(partition,1675076400000L);//将时间和分区绑定在一起然后合并在一起放入到检索方法中}MapTopicPartition, OffsetAndTimestamp offsets consumer.offsetsForTimes(map);//根据时间获取时间对应的偏移量位置for (Map.EntryTopicPartition, OffsetAndTimestamp en : offsets.entrySet()) {System.out.println(en.getKey()--en.getValue());if(en.getValue() ! null){consumer.seek(en.getKey(),en.getValue().offset());//获取每个分区的偏移量的位置使用seek进行找寻数据}}}});while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));IteratorConsumerRecordString, String it records.iterator();while(it.hasNext()){ConsumerRecordString, String record it.next();System.out.println(record.topic()-record.partition()- record.offset()-record.key()-record.value());} // consumer.commitAsync();}} }
http://www.hkea.cn/news/14399269/

相关文章:

  • 怎么套模板 网站学网站制作多少钱
  • 湛江免费建站公司电商网站seo
  • html5移动端网站建设上海建设网站公司
  • 学校校园网站使用中国制造货源网一件代发
  • 网站 站长统计代码谷歌优化技术
  • 有做销售产品的网站有哪些wordpress游戏主题下载
  • 旅游网站建设的重要性wordpress怎么修改中文
  • 杭州网站公司WordPress页面怎么html
  • 静态网站 插件天津seo排名扣费
  • 合肥网站推广培训手机网站开发库
  • 网站开发图书管理系统重庆网站建设索q479185700
  • wordpress一键仿站大连网站开发哪家好
  • 做网站需要哪些人手网站注销流程
  • 网站手机客户端制作软件万网网站加速
  • 淘宝客网站都用什么做常州市城乡建设局网站
  • 深圳 公司网站建设软件开发平台合同
  • 网站设计西安网站建设苏州有什么好玩的景点景区
  • 江阴市做网站的建设工程信息网重庆
  • 网站代码查看口碑营销的主要手段有哪些
  • 北京网站建设网站建设WordPress会员积分插件
  • 网店代运营需要多少钱seo舆情优化
  • 公司信息网站建设目标域名备案接入商查询
  • 京东怎么做轮播图链接网站哪个网站帮别人做ppt
  • 做网站的企业是什么行业网站策划案模板
  • 网站开发属于程序员吗手机短视频网站的建设
  • 保定医疗网站建设公司返利商城网站怎么做
  • 洛阳市涧西区建设局网站百度新闻
  • 做网站软件frontpage哪家网络推广好
  • 怎么做流量网站怎样给自己的店做网站
  • wap网站怎么发布秦皇岛做网站公司排名