如何将网站和域名绑定,什么外贸网站开发客户,门户网站建设内,wordpress覆盖安装目录
一、引言
二、Time 的分类及 EventTime 的重要性
Time 分类详述
EventTime 重要性凸显
三、Watermark 机制详解
核心原理
Watermark能解决什么问题,如何解决的?
Watermark图解原理
举例
总结
多并行度的水印触发
Watermark代码演示
需求
代码演示#xff…目录
一、引言
二、Time 的分类及 EventTime 的重要性
Time 分类详述
EventTime 重要性凸显
三、Watermark 机制详解
核心原理
Watermark能解决什么问题,如何解决的?
Watermark图解原理
举例
总结
多并行度的水印触发
Watermark代码演示
需求
代码演示
需求升级
代码演示
四、Flink对于迟到数据的处理
对于延迟数据的理解
延迟数据的处理
1allowedLateness
2) 侧输出-SideOutput
处理主要使用两个方式
sideOutputLateData方法
DataStream.getSideOutput方法
代码演示
五、总结展望 本文聚焦于 Flink 中 Time 的分类EventTime、IngestionTime、ProcessingTime着重阐述 EventTime 的重要性以及为应对基于 EventTime 处理流数据时因网络等因素导致的数据乱序、延迟问题所引入的 Watermark 机制。详细剖析 Watermark 的原理、功能、相关代码实现及与延迟数据处理机制allowedLateness、侧输出 SideOutput的协同运作助力读者深入掌握 Flink 实时数据处理的核心要点。 一、引言 在 Flink 实时数据处理场景下准确把握时间语义以及高效处理数据乱序、延迟问题至关重要。不同的时间语义各有侧重而 EventTime 凭借能反映事件本质的特性成为关键但也因其对数据顺序的 “严苛要求” 催生了 Watermark 等配套机制来保障处理的准确性与完整性。 二、Time 的分类及 EventTime 的重要性 Time 分类详述 EventTime作为事件数据真正发生、产生之时的时间戳记录它锚定了事件的源头时间不受数据后续流转环节的干扰比如用户在电商平台下单操作瞬间对应的时间即便后续网络波动致数据传输受阻该时间依旧锁定下单那一刻。 IngestionTime侧重标记数据抵达流处理系统的时间节点当数据跨越系统边界进入 Flink 体系时此刻的系统时钟时间即为 IngestionTime反映数据流入的 “入门时刻”。 ProcessingTime聚焦数据在流处理系统内被具体处理、计算的当下系统时间同一批数据在不同负载时段处理ProcessingTime 会因处理时刻差异而不同。 EventTime 重要性凸显 假设你正在去往地下停车场的路上并且打算用手机点一份外卖。选好了外卖后你就用在线支付功能付款了这个时候是11点59分(EventTime)。恰好这时你走进了地下停车库而这里并没有手机信号。因此外卖的在线支付并没有立刻成功而支付系统一直在Retry重试“支付”这个操作。 当你找到自己的车并且开出地下停车场的时候已经是12点01分了(ProcessingTime)。这个时候手机重新有了信号手机上的支付数据成功发到了外卖在线支付系统支付完成。 在上面这个场景中你可以看到 支付数据的事件时间是11点59分而支付数据的处理时间是12点01分 问题: 如果要统计12之前的订单金额,那么这笔交易是否应被统计? 答案: 应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分, 事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。 还可以通过钉钉打卡、饭卡机 等 举例子。 一条错误日志的内容为 2020-11-11 23:59:58 error NullPointExcep --事件时间 进入Flink的时间为2020-11-11 23:59:59 --摄入时间 到达Window的时间为2020-11-12 00:00:01 --处理时间 问题: 对于业务来说要统计每天的的故障日志个数哪个时间是最有意义的 答案: EventTime事件时间因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质! 某 App 会记录用户的所有点击行为并回传日志在网络不好的情况下先保存在本地延后回传。 A用户在 11:01:00 对 App 进行操作B用户在 11:02:00 操作了 App 但是A用户的网络不太稳定回传日志延迟了导致我们在服务端先接受到B用户的消息然后再接受到A用户的消息消息乱序了。 问题: 如果这个是一个根据用户操作先后顺序,进行抢购的业务,那么是A用户成功还是B用户成功? 答案: 应该算A成功,因为A确实比B操作的早,但是实际中考虑到实现难度,可能直接按B成功算 也就是说实际开发中希望基于事件时间来处理数据但因为数据可能因为网络延迟等原因出现了乱序按照事件时间处理起来有难度 在实际环境中经常会出现因为网络原因数据有可能会延迟一会才到达Flink实时处理系统。我们先来设想一下下面这个场景: 原本应该被该窗口计算的数据因为网络延迟等原因晚到了,就有可能丢失了 总结: 1.事件时间确实重要, 因为它能够代表事件/数据的本质,是事件/数据真真正正发生/产生的时间 2.按照事件时间进去处理/计算,会存在一定的难度, 因为数据可能会因为网路延迟等原因, 发生乱序或延迟到达, 那么最后的计算结果就有可能错误或数据丢失 3.需要有技术来解决上面的问题,使用Watermark技术来解决! 三、Watermark 机制详解 当 Flink 依 EventTime 处理流数据网络波动、分布式架构下节点通信延迟等因素常使数据到达顺序背离事件发生顺序陷入乱序困境。为化解此难题Watermark 机制应运而生犹如给无序数据流转引入 “秩序规则”。 核心原理 Watermark 本质是延迟触发机制通过公式 “Watermark 当前最大的事件时间 - 最大允许的延迟时间或最大允许的乱序度时间” 动态生成。例如设定出行集合场景09:00 集合且允许迟到 10 分钟人员陆续到达时间对应不同 Watermark 值能否上车依 Watermark 与集合时间对比判定将时间规则量化应用到数据处理上为数据窗口触发时机把控筑牢基础。 Watermark能解决什么问题,如何解决的? 有了Watermark 就可以在一定程度上解决数据乱序或延迟达到问题! 不添加watermark 窗口如何触发 1窗口有数据 2窗口的结束时间到了。 有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发: 1.窗口有数据 2.Watermark 窗口的结束时间 满足以上条件则触发窗口计算! 以前窗口触发:系统时间到了窗口结束时间就触发 现在窗口触发:Watermark 窗口的结束时间 而Watermark 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间) 就意味着, 通过Watermark改变了窗口的触发时机了, 那么接下来我们看如何改变的/如何解决前面的问题的 需要记住: Watermark 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间) 窗口触发时机 : Watermark 窗口的结束时间 水印watermark就是一个时间戳Flink可以给数据流添加水印可以理解为收到一条消息后额外给这个消息添加了一个时间字段这就是添加水印一般人为添加的消息的水印都会比当前消息的事件时间小一些。
窗口是否关闭按照水印时间来判断但原有事件时间不会被修改窗口的边界依旧是事件时间来决定。
水印并不会影响原有Eventtime当数据流添加水印后会按照水印时间来触发窗口计算一般会设置水印时间比Eventtime小一些一般几秒钟当接收到的水印时间 窗口的endTime且窗口内有数据则触发计算
水印水印时间的计算事件时间– 设置的水印长度 水印时间
比如事件时间是10分30秒, 水印长度是2秒那么水印时间就是10分28秒 Watermark图解原理 举例 窗口5秒延迟3秒按照事件时间计算 数据事件时间3, 落入窗口0-5.水印时间0 来一条数据事件时间7, 落入窗口5-10水印时间4 来一条数据事件时间4落入窗口0-5水印时间 4 因为此时的数据中最大的事件时间是 7延迟 3 秒按照公式 7-34 来一条数据事件时间8落入窗口5-10水印时间5 这一条数据水印时间大于等于窗口0-5的窗口结束时间。 满足了对窗口0-5的提交这个窗口关闭并触发数据计算 可以看出第三条数据其是延迟数据它的事件时间是4却来的比事件时间为7的数据还要晚。 但是因为水印的机制这个数据未错过它的窗口依旧成功进入属于它的窗口并且被计算 这就是水印的功能在不影响按照事件时间判断数据属于哪个窗口的前提下延迟某个窗口的关闭时间让其等待一会儿延迟数据。 总结 Watermark 是一个单独计算出来的时间戳 Watermark 当前最大的事件时间 - 最大允许的延迟时间(乱序度) Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题 Watermark 窗口结束时间 时 就会触发窗口计算(窗口中得有数据) 延迟或乱序严重的数据还是丢失, 但是可以通过调大 最大允许的延迟时间(乱序度) 来解决, 或 使用后面要学习的侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失! 多并行度的水印触发 在多并行度下每个并行有一个水印 比如并行度是6那么程序中就有6个watermark 分别属于这6个并行度(线程) 那么触发条件以6个水印中最小的那个为准 比如, 有个窗口是0-5 其中5个并行度的水印都超过了5 但有一个并行度的水印是3 那么不管另外5个并行度中的水印达到了多大都不会触发 因为6个并行度中的6个水印最小的是3不满足大于等于窗口结束5的条件 在测试水印的时候记得把并行度设置为1 好看结果否则结果不太容易看出来。 Watermark代码演示 需求 实时模拟生成订单数据,格式为: (订单ID用户ID时间戳/事件时间订单金额) 要求每隔5s,计算5秒内每个用户的订单总金额 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。 不使用水印的时候【不能使用eventtime时间语义】进行开发
package com.bigdata.day05;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;/**** 实时模拟生成订单数据,格式为: (订单ID用户ID时间戳/事件时间订单金额)* 要求每隔5s,计算5秒内每个用户的订单总金额* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。**/public class _01WatermarkDemo {Data // set get toStringAllArgsConstructorNoArgsConstructorpublic static class OrderInfo2{private String orderId;private int uid;private int money;private long timeStamp;}public static class MySource implements SourceFunctionOrderInfo2 {boolean flag true;Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random new Random();while(flag){OrderInfo2 orderInfo new OrderInfo2();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥Overridepublic void cancel() {flag false;}}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceOrderInfo2 orderSourceStream env.addSource(new MySource());//3. transformation-数据处理转换// 每个用户的订单总额KeyedStreamOrderInfo2, Integer keyedStream orderDSWithWatermark.keyBy(orderInfo2 - orderInfo2.getUid());SingleOutputStreamOperatorOrderInfo2 result1 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(money);result1.print();//4. sink-数据输出//5. execute-执行env.execute();}
}
假如出现如下错误
Exception in thread main org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericTypecom.bigdata.day05.OrderInfo2Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.init(SumAggregator.java:53)at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:688)at com.bigdata.day05._01WatermarkDemo.main(_01WatermarkDemo.java:103)
说明这个pojo 必须是public 的否则不解析。 代码演示
package com.bigdata.time;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.Date;
import java.util.Random;
import java.util.UUID;Data // set get toString
AllArgsConstructor
NoArgsConstructor
class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}class MySource extends RichSourceFunctionOrderInfo {boolean flag true;Overridepublic void run(SourceContextOrderInfo ctx) throws Exception {while(flag){OrderInfo orderInfo new OrderInfo();Random random new Random();int userId random.nextInt(10);int money random.nextInt(100);long timeStamp System.currentTimeMillis() - random.nextInt(3000);orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(userId);orderInfo.setMoney(money);orderInfo.setTimeStamp(timeStamp);ctx.collect(orderInfo);Thread.sleep(20);}}Overridepublic void cancel() {flag false;}
}
public class _01_OrderDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSourceOrderInfo streamSource env.addSource(new MySource());//3. transformation-数据处理转换streamSource.keyBy(new KeySelectorOrderInfo, Integer() {Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.sum(money).print();.apply(new WindowFunctionOrderInfo, String, Integer, TimeWindow() {Overridepublic void apply(Integer userId, TimeWindow window, IterableOrderInfo input, CollectorString out) throws Exception {String beginTime DateFormatUtils.format(window.getStart(), yyyy-MM-dd HH:mm:ss);String endTime DateFormatUtils.format(window.getEnd(), yyyy-MM-dd HH:mm:ss);int sum 0;for (OrderInfo orderInfo : input) {sum orderInfo.getMoney();}out.collect(beginTime,endTime,userId ,sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
} 需求升级 实时模拟生成订单数据,格式为: (订单ID用户ID时间戳/事件时间订单金额) * 要求每隔5s,计算5秒内每个用户的订单总金额 * 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。 假如你添加了 eventTime 缺没有添加水印的代码会报如下错误
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp ( no timestamp marker). Is the time characteristic set to ProcessingTime, or did you forget to call DataStream.assignTimestampsAndWatermarks(...)?at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:83)at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:302)
代码演示
生成 Watermark | Apache Flink
package com.bigdata.time;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class _02_OrderDemoWithWaterMark {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSourceOrderInfo streamSource env.addSource(new MySource());//3. transformation-数据处理转换streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.OrderInfoforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerOrderInfo() {// long 是时间戳吗是秒值还是毫秒呢年月日时分秒的的字段怎么办呢Overridepublic long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {// 这个方法的返回值是毫秒所有的数据只要不是这个毫秒值都需要转换为毫秒return orderInfo.getTimeStamp();}})).keyBy(new KeySelectorOrderInfo, Integer() {Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingEventTimeWindows.of(Time.seconds(5)))//.sum(money).print();.apply(new WindowFunctionOrderInfo, String, Integer, TimeWindow() {Overridepublic void apply(Integer userId, TimeWindow window, IterableOrderInfo input, CollectorString out) throws Exception {String beginTime DateFormatUtils.format(window.getStart(), yyyy-MM-dd HH:mm:ss);String endTime DateFormatUtils.format(window.getEnd(), yyyy-MM-dd HH:mm:ss);int sum 0;for (OrderInfo orderInfo : input) {sum orderInfo.getMoney();}out.collect(beginTime,endTime,userId ,sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
} 通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。
WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness) 我们实现一个延迟3秒的固定延迟水印可以这样做
DataStream dataStream ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))); 四、Flink对于迟到数据的处理 水印对于迟到数据不长 allowedLateness: 迟到时间很长 侧道输出对于迟到时间特别长 对于延迟数据的理解
水印机制(水位线、watermark)机制可以帮助我们在短期延迟下允许乱序数据的到来。
这个机制很好的处理了那些因为网络等情况短期延迟的数据让窗口等它们一会儿。
但是水印机制无法长期的等待下去因为水印机制简单说就是让窗口一直等在那里等达到水印时间才会触发计算和关闭窗口
这个等待不能一直等因为会一直缓着数据不计算。
一般水印也就是几秒钟最多几分钟而已看业务 那么在现实世界中延迟数据除了有短期延迟外长期延迟也是很常见的。 比如 l 客户端断网等了好几个小时才恢复 l 车联网系统进入隧道后没有信号无法上报数据 l 手机欠费没有网 等等这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了 而是小时、天级别的延迟 对于水印来说这样的长期延迟数据是无法很好处理的。 那么有没有什么办法去处理这些长期延迟的数据呢让其可以找到其所属的窗口正常完成计算哪怕晚了几个小时。 这个场景的解决方式就是延迟数据处理机制(allowedLateness方法)。 水印乱序数据处理时间很短的延迟
延迟处理长期延迟数据的处理机制 allowedLateness 机制剖析 Watermark 应对短期延迟后对于长期延迟数据如客户端断网数小时后恢复传输数据allowedLateness 机制接力。对窗口对象设置 “allowedLateness (Time.seconds (4))”以允许 4 秒长期延迟为例区别于 Watermark 触发计算且关闭窗口它仅触发计算、暂不关闭在允许时长内后续同窗口数据入流可再次计算整合灵活适配长延迟业务场景延展窗口 “有效期”。SideOutput 侧输出机制运用 即便有 Watermark 和 allowedLateness 双重保障仍有极度延迟数据 “漏网”。SideOutput 则像 “兜底方案”借助 “sideOutputLateData (outputTag)” 将错过双重机制的数据收集至特定 DataStream后续利用 “getSideOutput (tag)” 取出单独处理赋予开发者自定义逻辑应对极端情况权限如将严重迟到订单数据单独分析、告警完善数据处理全流程可靠性。 延迟数据的处理 waterMark和Window机制解决了流式数据的乱序问题对于因为延迟而顺序有误的数据可以根据eventTime进行业务处理对于延迟的数据Flink也有自己的解决办法 主要的办法是给定一个允许延迟的时间在该时间范围内仍可以接受处理延迟数据 设置允许延迟的时间是通过allowedLateness(lateness: Time)设置 保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存 获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取 1allowedLateness
当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法
该方法传入一个Time值设置允许的长期延迟(迟到)的时间。
和watermark不同。
未设置allowedLateness(为0)当watermark满足条件会触发窗口的 执行 关闭
当设置了allowedLateness当watermark满足条件后只会触发窗口的执行不会触发窗口关闭
也就是watermark满足条件后会正常触发窗口计算将已有的数据完成计算。
但是不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来那么每进来一条会和已经计算过的(被watermark触发的)数据一起在计算一次。 水印短期延迟达到条件后触发计算并且关闭窗口触发关闭同时进行 水印allowedLateness : 短期延迟 等待长期延迟效果 达到水印条件后会触发窗口计算但是不关闭窗口。事件时间延迟达到水印allowedLateness之和后会关闭窗口。 2) 侧输出-SideOutput Flink 通过watermark在短时间内允许了乱序到来的数据
通过延迟数据处理机制可以处理长期迟到的数据。
但总有那么些数据来的晚的太久了。允许迟到1天的设置它迟到了2天才来。
对于这样的迟到数据水印无能为力设置allowedLateness也无能为力那对于这样的数据Flink就只能任其丢掉了吗
不会Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口但是真的迟到太久了Flink也有一个机制将这些数据收集起来
保存成为一个DataStream然后交由开发人员自行处理。 那么这个机制就叫做侧输出机制(Side Output) 侧输出机制可以将错过水印又错过allowedLateness允许的时间的数据单独的存放到一个DataStream中然后开发人员可以自定逻辑对这些超级迟到数据进行处理。 处理主要使用两个方式 对窗口对象调用sideOutputLateData(OutputTag outputTag)方法将数据存储到一个地方 对DataStream对象调用getSideOutput(OutputTag outputTag)方法取出这些被单独处理的数据的DataStream 注意取到的是一个DataStream这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。
sideOutputLateData方法 使用方式: 先定义OutputTag对象(注意必须new一个匿名内部类形式的OutputTag对象的实例) 然后调用sideOutputLateData方法 // side output OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类 OutputTagTuple2String, Long outputTag new OutputTagTuple2String, Long(side output){}; WindowedStreamTuple2String, Long, Tuple, TimeWindow sideOutputLateData allowedLateness.sideOutputLateData(outputTag); DataStream.getSideOutput方法 用法: DataStreamTuple2String, Long sideOutput result.getSideOutput(outputTag); // 对得到的保存超级迟到数据的DataStream进行处理 sideOutput.print(late); 代码演示 前面的案例中已经可以使用Watermark 来解决一定程度上的数据延迟和数据乱序问题 但是对于延迟/迟到/乱序严重的数据还是会丢失,所以接下来 使用Watermark AllowedLateness SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失! package com.bigdata.day05;import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;class MyOrderSource2 implements SourceFunctionOrderInfo {Overridepublic void run(SourceContextOrderInfo ctx) throws Exception {Random random new Random();while(true){OrderInfo orderInfo new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString().replace(-,));// 在这个地方可以模拟迟到数据long orderTime System.currentTimeMillis() - 1000*random.nextInt(100);orderInfo.setOrdertime(orderTime);int money random.nextInt(10);System.out.println(订单产生的时间 DateFormatUtils.format(orderTime,yyyy-MM-dd HH:mm:ss)金额money);orderInfo.setMoney(money);orderInfo.setUserId(random.nextInt(2));ctx.collect(orderInfo);Thread.sleep(500);}}Overridepublic void cancel() {}
}
public class Demo01 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 每隔五秒统计每个用户的前面5秒的订单的总金额//2. source-加载数据DataStreamSourceOrderInfo streamSource env.addSource(new MyOrderSource2());//-2.告诉Flink最大允许的延迟时间/乱序时间为多少SingleOutputStreamOperatorOrderInfo orderDSWithWatermark streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.OrderInfoforBoundedOutOfOrderness(Duration.ofSeconds(3))//-3.告诉Flink哪一列是事件时间.withTimestampAssigner((order, time) - order.getOrdertime()));OutputTagOrderInfo outputTag new OutputTagOrderInfo(side output){};//3. transformation-数据处理转换SingleOutputStreamOperatorString result orderDSWithWatermark.keyBy(orderInfo - orderInfo.getUserId()).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(4)).sideOutputLateData(outputTag).apply(new WindowFunctionOrderInfo, String, Integer, TimeWindow() {Overridepublic void apply(Integer key, // 代表分组key值 五旬老太守国门TimeWindow window, // 代表窗口对象IterableOrderInfo input, // 分组过之后的数据 [1,1,1,1,1]CollectorString out // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);long start window.getStart();long end window.getEnd();int sum 0;// 专门存放迟到的订单时间for (OrderInfo orderInfo : input) {sum orderInfo.getMoney();}out.collect(key ,窗口开始 dateFormat.format(new Date(start)) ,结束时间: dateFormat.format(new Date(end)) , sum);//out.collect(key,窗口开始start,结束时间:end,sum);}});result.print(流中的数据包含迟到的数据);result.getSideOutput(outputTag).print(严重迟到的数据);//4. sink-数据输出//5. execute-执行env.execute();}
}
运行结果
订单产生的时间2024-05-16 11:19:00金额1
订单产生的时间2024-05-16 11:18:13金额3
严重迟到的数据 f96d34c438ce400eb21a25328fe772ee,1,3,2024-05-16 11:18:13
订单产生的时间2024-05-16 11:19:10金额3
2024-05-16 11:19:00
流中的数据包含迟到的数据 1,窗口开始2024-05-16 11:19:00,结束时间:2024-05-16 11:19:05,1,迟到的订单时间
订单产生的时间2024-05-16 11:18:19金额8
严重迟到的数据 cf85339600c647c99856841021466c5d,1,8,2024-05-16 11:18:19
订单产生的时间2024-05-16 11:18:19金额9
严重迟到的数据 f087ee9c9eaa4e3f9eac06a907b47fb4,1,9,2024-05-16 11:18:19
订单产生的时间2024-05-16 11:17:48金额3
严重迟到的数据 22f48bb693874a99b80177f6764f0912,0,3,2024-05-16 11:17:48
订单产生的时间2024-05-16 11:19:23金额1
2024-05-16 11:19:10
流中的数据包含迟到的数据 1,窗口开始2024-05-16 11:19:10,结束时间:2024-05-16 11:19:15,3,迟到的订单时间
订单产生的时间2024-05-16 11:19:19金额7
2024-05-16 11:19:19
流中的数据包含迟到的数据 1,窗口开始2024-05-16 11:19:15,结束时间:2024-05-16 11:19:20,7,迟到的订单时间
订单产生的时间2024-05-16 11:18:25金额2
严重迟到的数据 9b91cfd413784e4da9fb7f16b68186d0,0,2,2024-05-16 11:18:25
订单产生的时间2024-05-16 11:18:48金额3
严重迟到的数据 62bdd621dd4d43b2b9f401949c1c5686,1,3,2024-05-16 11:18:48
订单产生的时间2024-05-16 11:18:38金额1
严重迟到的数据 e6fe834c88d043ef94b66853ad67dc46,1,1,2024-05-16 11:18:38
订单产生的时间2024-05-16 11:18:39金额5
严重迟到的数据 472b35fc32744c39990d13bd490ae131,1,5,2024-05-16 11:18:39
订单产生的时间2024-05-16 11:18:19金额0
严重迟到的数据 49270e8cf00445f38a4fbcfebedfdc0a,0,0,2024-05-16 11:18:19
订单产生的时间2024-05-16 11:19:07金额8
严重迟到的数据 a337af0e6f1c46e48e2ed2ec99d6dc53,0,8,2024-05-16 11:19:07
订单产生的时间2024-05-16 11:19:22金额8
订单产生的时间2024-05-16 11:19:01金额3
严重迟到的数据 5bf845744c9d486f97fbd8106deb22bb,0,3,2024-05-16 11:19:01
订单产生的时间2024-05-16 11:18:42金额2
严重迟到的数据 f861897e49a54b589863efd6866ce61f,0,2,2024-05-16 11:18:42
订单产生的时间2024-05-16 11:18:15金额7
严重迟到的数据 37a8d94dc8b94854963ddcbefa3d00dc,0,7,2024-05-16 11:18:15
订单产生的时间2024-05-16 11:17:56金额6
严重迟到的数据 ab81cf409c604bb398d43b15f59d7e53,1,6,2024-05-16 11:17:56
订单产生的时间2024-05-16 11:18:24金额3
严重迟到的数据 f85598988f4b43599e719b873d930440,1,3,2024-05-16 11:18:24
订单产生的时间2024-05-16 11:17:52金额2
严重迟到的数据 03ade6f3972d441289bd745f979c961a,1,2,2024-05-16 11:17:52
订单产生的时间2024-05-16 11:19:20金额9
订单产生的时间2024-05-16 11:18:07金额1
严重迟到的数据 d8b8992ea2b74a5fb10bffb198917c8f,0,1,2024-05-16 11:18:07
订单产生的时间2024-05-16 11:18:34金额2
严重迟到的数据 a8855d90701449588efc4dfd48df1dd3,0,2,2024-05-16 11:18:34
订单产生的时间2024-05-16 11:19:11金额3
严重迟到的数据 a7e245864b2c4c03b463b121277309eb,0,3,2024-05-16 11:19:11
订单产生的时间2024-05-16 11:18:11金额9
严重迟到的数据 cb99f1b5c1e24e5bb6ccacb9f0f9ea78,0,9,2024-05-16 11:18:11
订单产生的时间2024-05-16 11:18:36金额7Process finished with exit code 130虽然我们添加了延迟的效果就是说侧道输出侧道输出不能触发窗口的执行窗口的执行只能通过水印时间触发 允许迟到的数据不放入到当前窗口中而是作为一个触发条件看到它需要放入到它对应的窗口中。 只考虑 1 个并行度的问题 订单发生的真实事件窗口5秒间隔5秒允许迟到 3秒 最晚允许迟到4秒 10:44:00 区间就应该是10:44:00 10:44:05 10:44:01 10:44:02 10:44:03 10:44:04 10:44:05 10:44:07 区间就应该是10:44:05 10:44:10 10:44:22 区间就应该是10:44:20 10:44:25 10:44:30 10:44:28 10:44:20 通过上面这个图可以知道44:07没有办法触发00~05的执行但是07不放入00~05区间而是放入10:44:05 10:44:10 44:22 一个数据触发了两个区间的执行 00~05 05~10 假如有一个订单时44:10产生的应该放入10~15这个区间 五、总结展望 Flink 中 Time 与 Watermark 体系紧密交织EventTime 定 “基准”Watermark 破 “乱序”allowedLateness 延 “时效”SideOutput 兜 “底线”协同赋能实时流数据精准、稳健处理。未来随着业务场景愈发复杂、数据规模持续膨胀对这套机制性能优化、智能自适应调整等方面探索将不断深入持续筑牢大数据实时处理 “基石”解锁更多高效数据利用价值。