当前位置: 首页 > news >正文

装修设计网站排名全屏网站帮助

装修设计网站排名,全屏网站帮助,安卓app开发培训,网站打印模板制作背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线#xff0c;比如这条特殊的记录代表一个完整记录的结束等#xff0c;本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线比如这条特殊的记录代表一个完整记录的结束等本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecordsbyte[], byte[] records handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionStateT, TopicPartition partition :subscribedPartitionStates()) {ListConsumerRecordbyte[], byte[] partitionRecords records.records(partition.getKafkaPartitionHandle()); // 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线 protected void emitRecordsWithTimestamps(QueueT records,KafkaTopicPartitionStateT, KPH partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record records.poll()) ! null) {long timestamp partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线记录这个分区的水位线并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}3.处理每个分区的水位线javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next wms.checkAndGetNextWatermark(event, eventTimestamp);if (next ! null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp watermark.getTimestamp();// 更新每个分区对应的水位线并且更新boolean wasUpdated state.setWatermark(timestamp);// if its higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线也就是算子任务级别的水位线而不是分区级别的了if (wasUpdated timestamp combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle false;final boolean updated watermark this.watermark;this.watermark Math.max(watermark, this.watermark);return updated;} 4.最后是发送算子任务级别的水位线的方法 private void updateCombinedWatermark() {long minimumOverAllOutputs Long.MAX_VALUE;boolean hasOutputs false;boolean allIdle true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle false;}hasOutputs true;}// if we dont have any outputs minimumOverAllOutputs is not valid, its still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs combinedWatermark) {combinedWatermark minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}你可以看这个流程是不是意味着如果使用Punctuated的方式是不支持Idle空闲时间的–答案是的
http://www.hkea.cn/news/14267572/

相关文章:

  • 佛山专业网站建设的公司国外采购商联系方式
  • 信息化和网站建设管理工作情况用单页做网站 文章直接写上去 百度收录关键词吗
  • 男生跟男生做口视频网站wordpress域名 文件
  • 虚拟商品购物网站源码wordpress当前分类热门调用
  • 购物网站支付页面制作有什么免费的wordpress
  • 无锡网站营销公司哪家好广告设计与制作工资
  • 百度网站ip地址广州建网站哪家最好
  • 企业的vi设计都包括哪几种seo tdk
  • 在线教育网站开发经验简历填写广西桂林自驾游攻略
  • 安徽网站优化价格咨询东莞常平镇
  • 网站建站基础linux删除WordPress
  • 网站前端设计培训自己网站建设的流程是什么
  • 新网站 不稳定企业网站源码带支付
  • 北京 网站 建设免费网站模板怎么做网站
  • 百度网站怎样做推广网站公司怎么做运营商
  • aspcms网站打开慢网络营销和传统营销的区别和联系
  • 做外贸做网站电子商务网站建设与维护试卷答案
  • 工作室网站一个人看片免费高清
  • 网站建设合同标准范本值得玩的网页游戏
  • 仿36氪wordpressseo数据监控平台
  • 该网站未在腾讯云备案google下载官网
  • 柳市外贸网站建设数据查询网站
  • 渭南哪家公司可以做网站全国安装平台有哪些
  • 宽带专家网站2345官网
  • 京东网站建设评估公司网站建设 做账
  • 免费下载网站软件网站模板源码免费下载
  • 永嘉哪里有做网站唐山公司网站建设
  • 食堂承包技术支持 东莞网站建设集团网站建设 中企动力
  • 手机网站版面设计中卫网站定制开发价格
  • 怎么做营销型网站设计项目网络计划图怎么画