福州市建设工程工料机信息网站,黄页88网是什么性质的网站,制作wordpress模板教程,如何做网站资讯前言 2023-12-02-20:05#xff0c;终于写完啦#xff0c;最近状态不错。刚写完又收到了她的消息哈哈哈哈#xff0c;开心。 再去全力打拼一次#xff0c;奋战一场#xff0c;就算最后打了败仗也无所谓#xff0c;至少你留下了足迹。 《解忧杂货店》 1、时间语义 …前言 2023-12-02-20:05终于写完啦最近状态不错。刚写完又收到了她的消息哈哈哈哈开心。 再去全力打拼一次奋战一场就算最后打了败仗也无所谓至少你留下了足迹。 《解忧杂货店》 1、时间语义 Flink 中的时间语义有两个事件时间和处理时间。事件时间也就是数据产生的时间通常都是数据自带的一个属性。处理时间则是指数据传输到我们集群被处理的时间。然而由于在我们分布式系统中数据在网络中有延迟以及不同机器的时钟可能不一致所以处理时间通常都要比事件时间滞后一些。 比如我们在 8:59:59 产生了一条数据只考虑网络延迟为 2s窗口的起始时间为 [8:00:00,9:00:00)。如果以事件时间作为默认的时间语义的话那么我们的集群一定得等到数据在 9:00:01 才会开始计算输出而如果以处理时间作为默认的时间语义的话那么当集群机器的时间达达 9:00:00 时立即进行计算输出。所以不难发现使用事件时间会牺牲一定的实时性而使用处理时间则会失去一定的准确性。 在实际应用中事件时间更加常见。一般情况下业务日志数据都会记录数据生成的时间戳它就可以作为事件时间的判断基础。 在 Flink 的早期版本中是以处理时间作为基本语义的但在 Flink 1.12 之后考虑到事件时间在实际中更加广泛所以 Flink 就以事件时间作为默认的时间语义了。
2、水位线Watermark
2.1、事件时间和窗口
我们的水位线正是基于事件时间提出来的所以先梳理一下事件时间和窗口的关系。 在这个窗口的处理过程中我们是基于数据的时间戳数据自带时间戳属性自定义了一个“逻辑时钟”。这个时钟的时间不会自动流逝它的时间进展就是靠着新到数据的时间戳来推动的。 事件时间完全依赖数据本身这样可以保证数据的结果绝对准确。也就是说不管机器时间是多少我们只以新来数据的时间戳更新时钟。一般的流处理场景中事件时间可以基本与处理时间保持同步只是略微有点延迟。
2.2、水位线概念 在事件时间语义下我们不依赖系统时间而是基于数据自带的时间戳去定义了一个时钟用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟它的前进是靠数据的时间戳来驱动的。 具体实现上水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点主要内容就是一个时间戳用来指示当前的事件时间。而它插入流中的位置是在某个数据到来之后这样就可以把这个数据的时间戳抽出来作为当前水位线的时间戳了。 上图是理想状态下数据量小数据按照有序的状态进入流中每条数据产生一个水位线。
1有序流中的水位线
然而实际应用中数据量非常大并且数据之间的时间差非常小几毫秒如果依然在每条数据后面标记一个水位线这样的代价是非常大的。所以为了提高效率一般会每隔一段时间生产一个水位线。这时的水位线就像是一个周期性出现的时间标记。 2无序流中的水位线
我们知道在分布式系统中数据在节点间传输会因为网络传输延迟的不确定性导致顺序发生改变比如我们多个 Source 的情况下数据通过不同的节点发送给下游而由于不同节点网络性能或硬件的差异3s 产生的数据可能在 1s 产生的数据之前被发送给下游被处理这就是所谓的“乱序数据”。 上图中很明显有很多乱序的数据所以有可能新的时间戳比之前的还小如果直接将这个时间的水位线再插入我们的“时钟”就回退了。所以当我们插入新的水位线时要先判断一下时间戳是否比之前的大否则就不再生成新的水位线。也就是说只有数据的时间戳比当前时钟大才能推动时钟前进这时才插入水位线。但是这样的代价就是每来一条数据就去判断一下事件时间是否大于当前水位线时间。 如果考虑到大量数据同时到来的处理效率明显每个数据比较一次是不可行的。我们同样可以周期性地生成水位线。这时只需要周期性地保存一下该周期内所有数据中的最大时间戳需要插入水位线时就直接以它作为时间戳生成新的水位线 。 但是上面的这种方法依然存在问题我们无法正确处理“迟到”的数据。在上面的例子中当 9 秒产生的数据到来之后我们就直接将时钟推进到了9 秒如果有一个窗口结束时间就是 9 秒比如要统计 0-9 秒的所有数据那么这时窗口就应该关闭、将收集到的所有数据计算输出结果了。但事实上由于数据是乱序的还可能有时间戳为 7 秒、8 秒的数据在 9 秒的数据之后才到来这就是“迟到数据”late data。它们本来也应该属于 0~9 秒这个窗口但此时窗口已经关闭于是这些数据就被遗漏了这会导致统计结果不正确。而解决这种问题的方法也比较简单就是等一下也就是说为了让窗口能够正确的收集迟到的数据我们可以让窗口等上一段时间比如 2s。 同样我们一般都是周期性地生成水位线 这里需要特别注意的是一个窗口所收集的数据并不是之前所有已经到达的数据而是真正数据的事件时间在该窗口范围内的。我们需要了解一下水位线和窗口的工作原理
水位线和窗口的工作原理 重点 我们之前把窗口理解为一个桶处理完一个范围内的数据后就清空然后继续下一个窗口。这在处理时间语义下是没有问题的因为我们并不关心数据的是什么时候产生的我们只关心数据是什么时候来的我只保证来一个处理一个在处理时间范围内处理并输出就好了。但是在事件时间语义下这种理解是错误的因为数据属于哪个窗口是由数据本身的时间戳决定的一个窗口只会收集真正属于它的那些数据。比如上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来10~20 秒的窗口中也不会收集这个数据进行计算依然可以得到正确的结果。 所以我们的每个窗口都是一个桶每次收集数据时它只会取走属于自己窗口内的数据当达到窗口的结束时间比如等待 2s 的情况下窗口 [0,10)的结束时间就是 12也就是说当来一条 事件时间为 11s 的数据时我们认为当前的时间达到了 w(11-2)9当来一条事件时间为 12s 的数据时 w(12-2)10 10 已经达到了我们的窗口关闭时间这事就说明事件时间在 10 之前的数据都已经到齐了窗口[0,10) 也就会关闭了时就对桶内的数据进行计算处理。 注意窗口是我们属于窗口范围内的第一条数据到来的时候现 new 的也就是动态创建的而不是静态创建好的。
3水位线的特性
水位线是插入到数据流中的一个标记可以认为是一个特殊的数据水位线主要的内容是一个时间戳用来表示当前事件时间的进展水位线是基于数据的时间戳生成的水位线的时间戳必须单调递增以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟来保证正确处理乱序数据一个水位线 Watermark(t)表示在当前流中事件时间已经达到了时间戳 t 这代表 t 之前的所有数据都到齐了之后流中不会出现时间戳 t’ ≤ t 的数据
水位线是 Flink 流处理中保证结果正确性的核心机制它往往会跟窗口一起配合完成对乱序数据的正确处理。
2.3、生成水位线
1生成水位线的总体原则 完美的水位线是“绝对正确”的也就是一个水位线一旦出现就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据我们该怎么做呢由于网络传输的延迟不确定节点挂了网络异常为了获取所有迟到数据保证计算结果完全正确必须等待足够长的时间但这会带来更高的延迟。 如果我们希望计算结果能更加准确那可以将水位线的延迟设置得更高一些等待的时间越长自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了我们可能为极少数的迟到数据增加了很多不必要的延迟。如果我们希望处理得更快、实时性更强那么可以将水位线延迟设得低一些。这种情况下可能很多迟到数据会在水位线之后才到达就会导致窗口遗漏数据计算结果不准确。 当然如果我们对准确性完全不考虑、一味地追求处理速度可以直接使用处理时间语义毕竟不在乎数据准确性也就无所谓迟到这在理论上可以得到最低的延迟。 所以 Flink 中的水位线其实是流处理中对低延迟和结果正确性的一个权衡机制而且把控制的权力交给了程序员我们可以在代码中定义水位线的生成策略。接下来我们就具体了解一下水位线在代码中的使用。
2水位线生成策略
在 Flink 的 DataStream API 中 有 一 个 单 独 用 于 生 成 水 位 线 的 方法assignTimestampsAndWatermarks()它主要用来为流中的数据分配时间戳并生成水位线来指示事件时间
public SingleOutputStreamOperatorT assignTimestampsAndWatermarks( WatermarkStrategyT watermarkStrategy)这里的 WatermarkStrategy 是一个接口它包含了一个 “时间戳分配器” 和一个“水位线生成器”。
DataStreamEvent stream env.addSource(new ClickSource());
DataStreamEvent withTimestampsAndWatermarks stream.assignTimestampsAndWatermarks(watermark strategy);至于为什么要有时间戳分配器这是因为原始数据中的时间戳只是写入日志数据的一个字段如果不提取出来并明确把它分配给数据Flink 是无法知道数据真正产生的时间的。当然有些时候数据源本身就提供了时间戳信息比如读取 Kafka 时我们就可以从 Kafka 数据中直接获取时间戳而不需要单独提取字段分配了。
public interface WatermarkStrategyT extends TimestampAssignerSupplierT,WatermarkGeneratorSupplierT{// 时间戳分配器OverrideTimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context);// 水位线生成器OverrideWatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}3Flink 内置水位线策略 1、有序流中内置水位线设置
我们来演示一个水位线驱动的滚动窗口注意这里的水位线是事件时间语义下的这里演示的是有序流。
对于有序流主要特点就是时间戳单调增长Monotonously Increasing Timestamps所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说就是直接拿当前最大的时间戳作为水位线就可以了。
public class WaterMarkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成泛型方法需要指定数据类型升序的watermark 没有等待时间.WaterSensorforMonotonousTimestamps()// 指定如何从数据中提取事件时间.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() { //函数接口 可以用lambda表达式Overridepublic long extractTimestamp(WaterSensor sensor, long recordTimestamp) {System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000; // 返回的时间戳单位是 ms}}));KeyedStreamWaterSensor, String sensorKs sensorDS.keyBy(WaterSensor::getId);// todo 1. 指定窗口分配器基于事件时间的滚动窗口 watermark 才能起作用WindowedStreamWaterSensor, String, TimeWindow tumblingWindow sensorKs.window(TumblingEventTimeWindows.of(Time.seconds(10)));// todo 2. 指定窗口函数增量聚合的规约函数SingleOutputStreamOperatorString process tumblingWindow.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String start sdf.format(new Date(startTs));String end sdf.format(new Date(endTs));long size elements.spliterator().estimateSize();out.collect(key key 的窗口[ start , end ]包含 size 条数据 elements.toString());}});process.print();env.execute();}
}上面的代码中我们把 WaterSensor 的 ts 属性当做数据自带的事件时间因为单位是毫秒所以我们 *1000。withTimestampAssigner中的参数里的 recordTimeStamp 的默认值为 Long.MIN_VALUE一般场景用不到。
测试输入
s1,1,1
s1,2,2
s1,3,3
s1,5,5
s1,9,9
s1,10,10
s1,20,20
输出结果
数据WaterSensor{ids1, ts1, vc1},recordTs-9223372036854775808
数据WaterSensor{ids1, ts2, vc2},recordTs-9223372036854775808
数据WaterSensor{ids1, ts3, vc3},recordTs-9223372036854775808
数据WaterSensor{ids1, ts5, vc5},recordTs-9223372036854775808
数据WaterSensor{ids1, ts9, vc9},recordTs-9223372036854775808
数据WaterSensor{ids1, ts10, vc10},recordTs-9223372036854775808
keys1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含5条数据[WaterSensor{ids1, ts1, vc1}, WaterSensor{ids1, ts2, vc2}, WaterSensor{ids1, ts3, vc3}, WaterSensor{ids1, ts5, vc5}, WaterSensor{ids1, ts9, vc9}]
数据WaterSensor{ids1, ts20, vc20},recordTs-9223372036854775808
keys1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含1条数据[WaterSensor{ids1, ts10, vc10}] 可以看到我们设置的窗口大小为 10 s所以当WaterSensor{ids1,ts10,vc10}来的时候才触发窗口计算输出并关闭。我们的窗口是左闭右开的。而且窗口并不会把不属于该窗口的数据包含进去。
2、乱序流中内置水位线设置 由于乱序流中需要等待迟到数据到齐所以必须设置一个固定量的延迟时间Fixed Amount of Lateness。这时生成水位线的时间戳就是当前数据流中最大的时间戳减去延迟的结果相当于把表调慢当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数表示“最大乱序程度”它表示数据流中乱序数据时间戳的最大差值如果我们能确定乱序程度那么设置对应时间长度的延迟就可以等到所有的乱序数据了。
这里我们继续使用滚动窗口来演示 我们只需在上面代码的基础上修改
SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)) // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)- {System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));
这里我们设置等待时间为 2s。
测试输入
s1,1,1
s1,2,2
s1,5,5
s1,7,7
s1,9,9
s1,10,10
s1,3,3
s1,11,11
s1,12,12
输出结果
数据WaterSensor{ids1, ts1, vc1},recordTs-9223372036854775808
数据WaterSensor{ids1, ts2, vc2},recordTs-9223372036854775808
数据WaterSensor{ids1, ts5, vc5},recordTs-9223372036854775808
数据WaterSensor{ids1, ts7, vc7},recordTs-9223372036854775808
数据WaterSensor{ids1, ts9, vc9},recordTs-9223372036854775808
数据WaterSensor{ids1, ts10, vc10},recordTs-9223372036854775808
数据WaterSensor{ids1, ts3, vc3},recordTs-9223372036854775808
数据WaterSensor{ids1, ts11, vc11},recordTs-9223372036854775808
数据WaterSensor{ids1, ts12, vc12},recordTs-9223372036854775808
keys1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含6条数据[WaterSensor{ids1, ts1, vc1}, WaterSensor{ids1, ts2, vc2}, WaterSensor{ids1, ts5, vc5}, WaterSensor{ids1, ts7, vc7}, WaterSensor{ids1, ts9, vc9}, WaterSensor{ids1, ts3, vc3}]
keys1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含3条数据[WaterSensor{ids1, ts10, vc10}, WaterSensor{ids1, ts11, vc11}, WaterSensor{ids1, ts12, vc12}]可以看到我们数据的事件时间达到10s时窗口仍然没有关闭此时依然可以接受迟到的数据直到大于等待时间窗口关闭时间 12s的数据来的时候才会触发窗口计算关闭。
3、内置水位线原理
1乱序流中水位线的生成原理
对于我们上面的乱序流中生成水位线原理我们可以查看 WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)) 的底层源码 2有序流中水位线的生成原理
同样我们查看 WaterSensorforMonotonousTimestamps() 方法的源码 它也是返回一个对象我们继续查看 我们发现有序水位线它的底层仍然是乱序水位线只不过它的等待时间为 0ms 。
总结
内置水位线的生成原理
都是周期性生产的默认是 200ms可以通过 env.getConfig().setAutoWatermarkInterval() 查看默认的水位线生成周期有序流watermark 当前最大事件时间 - 0 ms乱序流watermark 当前最大事件时间 - 等待时间也叫乱序程度 -1 ms
4、自定义水位线策略
1周期性水位线生产策略
周期时间我们一般是不去随便修改的默认为 200 ms。
下面我们模仿 Flink 的内置乱序流水位线策略来自定义一个水位线生成器
public class MyPeriodWatermarkGeneratorT implements WatermarkGeneratorT {private long maxTs; // 保存到当前为止最大的事件时间private long delayTs; // 保存等待时间public MyPeriodWatermarkGenerator(long delayTs) {this.maxTs Long.MIN_VALUE this.delayTs 1;this.delayTs delayTs;}/*** 每条数据来都会调用一次用来提取最大的事件时间* param event* param eventTimestamp 提取到的事件时间* param output*/Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs Math.max(maxTs,eventTimestamp);System.out.println(调用 onEvent 方法,获取当前最大的时间戳maxTs);}/*** 周期性调用: 生成 watermark* param output*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTs - delayTs - 1));System.out.println(调用onPeriodicEmit方法生成watermark(maxTs - delayTs - 1));}
}测试
// 这里为了测试 一般不去修改水位线生成的周期时间
env.getConfig().setAutoWatermarkInterval(2000);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定自定义的watermark生成器.WaterSensorforGenerator(ctx - new MyPeriodWatermarkGenerator(3000))// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)- {System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));
我们可以发现onPeriodEmit方法是每周期执行一次。
2断点式水位线生成器
断点式和周期式唯一的不同就是发送水位线的方法上面的周期式中我们使用 onPeriodicEmit方法来周期性地发送水位线而断电式则由 onEvent 来发送水位线也就是只要有新的一条数据来它就会更新水位线。具体代码只需要修改以下部分
Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs Math.max(maxTs,eventTimestamp);output.emitWatermark(new Watermark(maxTs - delayTs -1));System.out.println(调用 onEvent 方法,获取当前最大的时间戳maxTs生成watermark(maxTs - delayTs - 1));}Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 啥也不用干}
对于我们之前的 Kafka 数据源我们现在可以指定它的水位线生产策略了
env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2000L)),kafkaSource).print();
注意水位线策略的设置只需要设置一次Kafka 数据源不需要设置时间戳读取器也就是如何从数据源读取事件时间因为对于 Kafka 数据源框架可以直接从 Source 中获取事件时间。
2.4、水位线的传递 我们知道水位线是数据流中插入的一个标记用来表示事件时间的进展。它随着数据一起在任务间传递。 在直通式forward传输的情况下数据和水位线都是按照本身的顺序依次传递、依次处理的。一旦水位线到达了算子任务该任务就会将它内部的时钟设为这个水位线的时间戳。 然而实际应用中往往上下游都有多个并行子任务为了统一推进事件时间的进展要求上游任务处理完水位线、时钟改变之后要把当前的水位线再次发出广播给所有的下游子任务。这样后续任务就不需要依赖原始数据中的时间戳避免数据经过转化处理后发生改变也可以知道当前事件时间了。 还有一个问题就是在“重分区”redistributing的传输模式下一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步有的子任务处理的数据的事件时间早有的任务处理的的数据的事件时间晚所以也就使得每个子任务的水位线时间戳有的快有的慢也就使得不同子任务的逻辑时钟不同步所以同一时刻发给下游任务的水位线可能并不相同。这个时候下游就要确定到底按照谁发来的水位线来确定为当前事件的最新进展答案是最小的水位线因为我们水位线的本质就是 “保证当前时间之前的数据都已经到齐了”。 此外多并行度情况下我们的一条数据通常只会去往一个分区分区就是子任务但是我们的水位线是特殊的它会广播到所有下游节点来推进整个事件的进展。还需要注意的是多并行度的情况下往往会对我们的水位线有影响比如我们设置的等待时间为 3s但当事件时间为 13 的数据到来后它并不会立即关闭窗口因为在多并行度下水位线的更新是取最小的取的是两个上游任务中的最小比如
上游并行任务(等待3s) 水位线
map1 - 1- 一条数据无法取最小
map2 - 3- 取最小1 -2
map1 - 5 - 取最小3 0
map2 - 7- 取最小5 2
map1 - 13 - 取最小7 4
map2 - 14- 取最小13 10
2.5、设置空闲等待Idleness 在多个并行度的情况下我们知道水位线的更新需要至少通过两个上游并行任务的数据的事件时间来比较。而加入一个上游中只有一条数据会出现什么情况呢
上游任务(等待3s) 事件时间 水位线
map1 - 1- 一条数据无法取最小
map2 - 2- 取最小1 -2
map1 - 3 - 取最小2 -1
map1 - 5- 一条数据无法取最小(还需要一条map2的数据)
map1 - 7- 一条数据无法取最小(还需要一条map2的数据)
map1 - 13- 一条数据无法取最小(还需要一条map2的数据)
可以看到这样就会造成我们的逻辑时钟水位线迟迟无法推进怎么解决呢就是当我们的一个上游并行任务不再有数据到来时我们下游任务不再等待。
public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 自定义分区器 把奇数和偶数分区到两个不同的map子任务// 输入的数字就是事件时间*1000msSingleOutputStreamOperatorInteger socketDS env.socketTextStream(localhost, 9999).partitionCustom(new MyPartitioner(), num - num) //根据自己来进行分区.map(Integer::parseInt)// todo 指定 watermark 策略.assignTimestampsAndWatermarks(WatermarkStrategy// 使用有序流的watermark生成器 升序.IntegerforMonotonousTimestamps()// 指定如何从数据中提取事件时间.withTimestampAssigner((num, ts) - num * 1000L)// 空闲等待时间 5s.withIdleness(Duration.ofSeconds(5)));SingleOutputStreamOperatorString process socketDS.keyBy(num - num % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionInteger, String, Integer, TimeWindow() {Overridepublic void process(Integer key, Context context, IterableInteger elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String start sdf.format(new Date(startTs));String end sdf.format(new Date(endTs));long size elements.spliterator().estimateSize();out.collect(key key 的窗口[ start , end ]包含 size 条数据 elements.toString());}});process.print();env.execute();}
}上面的代码中我们的并行度为2由于数据源是 Socket 所以 Source算子并行度只能为 1而 输入的数据由于我们指定了 MyPartitioner 所以它会按照把奇数和偶数分到不同的 map算子 在水位线传递的过程中当上游没有偶数传递时处理奇数的process算子需要等待偶数数据到来才能确定窗口的关闭时间。这是因为水位线的生成是基于事件时间的而事件时间是根据数据本身的时间戳来计算的。处理奇数的process算子虽然只处理奇数数据但是它需要等待偶数数据到来以便根据偶数数据的时间戳来确定窗口的关闭时间。如果处理奇数的process算子不等待偶数数据到来就关闭窗口那么可能会出现数据丢失或计算结果不正确的情况。
2.6、迟到数据的处理 之前我们说通过设置等待时间可以解决一定的数据乱序问题但并不是 100% 的解决因为往往不会把等待时间设置的太久会造成计算的延迟所以考虑到一些数据乱序程度无法预知光靠等待时间是不行的会造成结果不准确。解决数据乱序问题我们除了设置等待时间其实还有两招设置窗口延迟关闭 和 使用侧输出流接收延迟数据。
2.6.1、设置窗口延迟关闭 我们可以在 window() 方法之后 .allowedLateness(Time.seconds(2)) 来设置关窗时间为 2s。窗口的触发计算和关闭是两码事我们之前都是触发计算后直接关闭这里我们设置延迟关闭 2s也就是说当有数据的事件时间达到窗口最大值窗口被触发计算一次但不会立即关闭而是允许再多等一会但是如果出现有比窗口最大关闭时间还要大2s的数据来时窗口直接关闭。
public class WaterMarkAllowLaterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)) // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)- {System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));SingleOutputStreamOperatorString process sensorDS.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) //设置运行窗口延迟关闭2s.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String start sdf.format(new Date(startTs));String end sdf.format(new Date(endTs));long size elements.spliterator().estimateSize();out.collect(key key 的窗口[ start , end ]包含 size 条数据 elements.toString());}});process.print();env.execute();}
}测试输入
s1,1,1
s1,2,2
s1,10,10
s1,12,12
s1,6,6
s1,3,3
s1,14,14
s1,5,5
s1,3,3运行结果
keys1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含2条数据[WaterSensor{ids1, ts1, vc1}, WaterSensor{ids1, ts2, vc2}]
keys1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含3条数据[WaterSensor{ids1, ts1, vc1}, WaterSensor{ids1, ts2, vc2}, WaterSensor{ids1, ts6, vc6}]
keys1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含4条数据[WaterSensor{ids1, ts1, vc1}, WaterSensor{ids1, ts2, vc2}, WaterSensor{ids1, ts6, vc6}, WaterSensor{ids1, ts3, vc3}]
keys1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含3条数据[WaterSensor{ids1, ts10, vc10}, WaterSensor{ids1, ts12, vc12}, WaterSensor{ids1, ts14, vc14}]
可以看到当数据 s1,12,12 到来时窗口触发计算一次但没有立即关闭所以之后迟到的 s1,6,6, 和 s1,3,3 仍然可以触发计算但是当大于窗口最大关闭时间2s允许迟到的时间的数据 s1,14,14 到来后窗口彻底关闭之后到来的 s1,5,5 和 s1,3,3 无法进行计算。
2.6.2、使用侧输出流接收延迟数据
流式数据没有 100% 的完美数据迟到不可能彻底解决为了尽可能让结果正确让极端迟到的数据仍然能够计算我们还可以使用侧输出流。
public class WaterMarkAllowLaterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法需要指定数据类型乱序的watermark 需要设置等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)) // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)- {System.out.println(数据 sensor ,recordTs recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));// 定义侧输出流OutputTagWaterSensor lateData new OutputTag(lateData, Types.POJO(WaterSensor.class));SingleOutputStreamOperatorString process sensorDS.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) //设置运行窗口延迟关闭2s.sideOutputLateData(lateData) // 关窗后的迟到数据放到侧输出流.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String start sdf.format(new Date(startTs));String end sdf.format(new Date(endTs));long size elements.spliterator().estimateSize();out.collect(key key 的窗口[ start , end ]包含 size 条数据 elements.toString());}});process.print();// 从主流获取侧输出流并打印process.getSideOutput(lateData).printToErr();env.execute();}
}测试输入
s1,1,1
s1,2,2
s1,12,12
s1,5,5
s1,7,7
s1,14,14
s1,1,1
s1,2,2运行结果 2.7、迟到数据总结
2.7.1、乱序和迟到的区别
乱序数据的顺序乱了事件时间小的数据 比 事件时间大的数据 晚来迟到数据的事件时间 水位线时间窗口关闭了才来
2.7.2、迟到数据的处理
设置乱序等待时间如果开窗设置窗口允许迟到延迟关闭窗口关窗后的数据放到侧输出流
对数据的延迟时间要做到心中有数
等待时间设置一个不是特别大的一般都是秒级在 乱序和延迟中做取舍允许迟到时间窗口延迟关闭时间置考虑大部分的迟到数据极端迟到数据放到侧输出流最后单独拿出来合并一下就好了 耗费三四天时间终于把这一块学完了时间语义是非常重要的内容需要好好理解记忆也要知道怎么通过代码实现。