当前位置: 首页 > news >正文

河北省住房和城乡建设厅网站官网北京代建网站

河北省住房和城乡建设厅网站官网,北京代建网站,网站建设工资一月多少,深圳福田区怎么样Flink写入Kafka两阶段提交 端到端的 exactly-once#xff08;精准一次#xff09; kafka - Flink - kafka 1#xff09;输入端 输入数据源端的 Kafka 可以对数据进行持久化保存#xff0c;并可以重置偏移量#xff08;offset#xff09; 2#xff09;Flink内…Flink写入Kafka两阶段提交 端到端的 exactly-once精准一次 kafka - Flink - kafka 1输入端 输入数据源端的 Kafka 可以对数据进行持久化保存并可以重置偏移量offset 2Flink内部 Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义 3输出端 两阶段提交2PC。 写入 Kafka 的过程实际上是一个两段式的提交处理完毕得到结果写入 Kafka 时是基于事务的“预提交”等到检查点保存完毕才会提交事务进行“正式提交”。 如果中间出现故障事务进行回滚预提交就会被放弃恢复状态之后也只能恢复所有已经确认提交的操作。 必须的配置 1必须启用检查点 2指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE 3配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交2PC第一阶段预提交数据也会被读到下游消费者需要设置为读已提交】 4事务超时配置 【配置的事务超时时间 transaction.timeout.ms 默认是1小时而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时有可能出现 Kafka 已经认为事务超时了丢弃了预提交的数据而Sink任务认为还可以继续等待。如果接下来检查点保存成功发生故障后回滚到这个检查点的状态这部分数据就被真正丢掉了。因此checkpoint 间隔 事务超时时间 max的15分钟】 代码实战 kafka - Flink - kafka【Flink处理kafka来源数据再输出到kafka】 public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/chk);checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092).setGroupId(default).setTopics(topic_1).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString kafkasource env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource);/*3.写出到 Kafka精准一次 写入 Kafka需要满足以下条件【缺一不可】1、开启 checkpoint2、sink 设置保证级别为 精准一次3、sink 设置事务前缀4、sink 设置事务超时时间 checkpoint 间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092)// 指定序列化器指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次必须设置 事务的前缀.setTransactionalIdPrefix(li-)// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();kafkasource.sinkTo(kafkaSink);env.execute();} }后续读取“ws”这个 topic 的消费者要设置事务的隔离级别为“读已提交” public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092).setGroupId(default).setTopics(ws).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, read_committed).build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource).print();env.execute();} }处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。
http://www.hkea.cn/news/14399964/

相关文章:

  • 海南定安建设局网站石城网站建设
  • 网站开发工程师工资hangq网站建设上海诏业
  • 基于.net音乐网站开发新浪短网址链接生成器
  • 什么网站做电子章做得好无锡网页建站
  • 怎么做优惠券网站电子商务网站建设技巧
  • 岳阳网站优化公司安陆做网站公司
  • 湖南做网站 就问磐石网络专业交互做的好网站
  • 我想做卖鱼苗网站怎样做中信建设有限责任公司待遇
  • 怎么样申请网站域名百度指数是搜索量吗
  • 做网站接电话一般要会什么大连高端模板建站
  • 织梦模板网站源码wordpress 所属分类
  • 西安网站快速优化泰安人才网最新招聘
  • 福州企业网站推广定制wordpress php 模板修改
  • 长沙公司制作网站费用多少在互联网公司上班都做啥的
  • 打鱼网站建设中国企业网地址
  • 卖东西的网站有哪些杭州市建设工程造价管理协会网站
  • 中山做网站比较好网站建设的大公司有哪些
  • 南宁怎么做网站响应式网站高度如何计算
  • 中国免费素材网站如何诊断网站seo
  • 福建泉州做淘宝的拿货什么网站搬家
  • 湛江专业自助建站详细解读实时热搜
  • 星宿网站建设行业门户网站是什么
  • 湘潭响应式网站建设 速来磐石网络WordPress用户聊天功能
  • 网站移动端是什么情况wordpress 多语言版
  • 企业网站建设应该怎么做免费翻国外墙的浏览器
  • 如何建设一个外卖订餐平台网站wordpress跳转二级域名
  • 免费外链网站seo发布菏泽市建设局网站电话
  • 东莞中企动力做网站怎样做动漫照片下载网站
  • 外贸建站 宁波与小学生一起做网站
  • 做护理简历的网站学做网站视频