网站的pv是什么,找人做效果图那个网站,网站对图片优化吗,台州网站关键字优化详情时间机制
Flink中的时间机制主要用在判断是否触发时间窗口window的计算。
在Flink中有三种时间概念#xff1a;ProcessTime、IngestionTime、EventTime。
ProcessTime#xff1a;是在数据抵达算子产生的时间#xff08;Flink默认使用ProcessTime#xff09;
IngestionT…时间机制
Flink中的时间机制主要用在判断是否触发时间窗口window的计算。
在Flink中有三种时间概念ProcessTime、IngestionTime、EventTime。
ProcessTime是在数据抵达算子产生的时间Flink默认使用ProcessTime
IngestionTime是在DataSource生成数据产生的时间
EventTime是数据本身携带的时间具有实际业务含义不是Flink框架产生的时间 水位机制
由于网络原因、故障等原因数据的EventTIme并不是单调递增的是乱序的有时与当前实际时间相差很大。
水位watermark用在EventTime语义的窗口计算可以当作当前计算节点的时间。当水位超过窗口的endtime表示事件时间t T的数据都**已经到达**这个窗口就会触发WindowFunction计算。当水位超过窗口的endtime允许迟到的时间窗口就会消亡。本质是DataStream中的一种特殊元素每个水印都携带有一个时间戳。 队列中是乱序的数据流入长度3s的窗口。2s的数据进入[0,4)的窗口中2s、3s、1s的数据进入[0,4)的窗口7s的数据分配到[4,8)的窗口中水印4s到达代表4s以前的数据都已经到达。触发[0,4)的窗口计算[4,8)的窗口等待数据水印9s到达[4,8)的窗口触发
多并行情况下不同的watermark流到算子取最小的wartermark当作当前算子的watermark。
如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间那么所有流的数据肯定已经全部收齐就可以安全地触发窗口计算了。 生成水位 首先设置env为事件时间
使用 DataStream API 实现 Flink 任务时Watermark Assigner 能靠近 Source 节点就靠近 Source 节点能前置尽量前置。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//测试数据间隔1s发送
DataStreamSourceTuple2String, Long source env.addSource(new SourceFunctionTuple2String, Long() {Overridepublic void run(SourceContextTuple2String, Long ctx) throws Exception {ctx.collect(Tuple2.of(aa, 1681909200000L));//2023-04-19 21:00:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681909500000L));//2023-04-19 21:05:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681909800000L));//2023-04-19 21:10:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681910100000L));//2023-04-19 21:15:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681910400000L));//2023-04-19 21:20:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681910700000L));//2023-04-19 21:25:00Thread.sleep(Long.MAX_VALUE);}Overridepublic void cancel() {}});抽取EventTime、生成Watermark
周期性水位–AssignerWithPeriodicWatermarks常用
周期性生成水位。周期默认的时间是 200ms.
源码如下
PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic Preconditions.checkNotNull(characteristic);if (characteristic TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}自定义实现AssignerWithPeriodicWatermarks代码如下
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Long() {private long currentTimestamp;NullableOverride// 生成watermarkpublic Watermark getCurrentWatermark() {return new Watermark(currentTimestamp);}Override//获取事件时间public long extractTimestamp(Tuple2String, Long element, long previousElementTimestamp) {if (element.f1currentTimestamp){currentTimestamp element.f1;}return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();运行结果如下
水位时间到达2023-04-19 21:10:00触发窗口2023-04-19 21:00:00到2023-04-19 21:10:00窗口中的数据为2023-04-19 21:00:00和2023-04-19 21:05:00
水位时间到达2023-04-19 21:20:00触发窗口2023-04-19 21:10:00到2023-04-19 21:20:00窗口中的数据为2023-04-19 21:10:00和2023-04-19 21:15:00
长时间等待后2023-04-19 21:20:00到2023-04-19 21:30:00是存在一个2023-04-19 21:25:00的数据一直没有触发。这是因为没有新的数据进入周期性生成的watermark一直是2023-04-19 21:20:00。所以后面窗口即使有数据也没有触发计算。 BoundedOutOfOrdernessTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor实现了AssignerWithPeriodicWatermarks接口是flink内置的实现类。
主要源码如下
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds() 0) {throw new RuntimeException(Tried to set the maximum allowed lateness to maxOutOfOrderness . This parameter cannot be negative.);}this.maxOutOfOrderness maxOutOfOrderness.toMilliseconds();this.currentMaxTimestamp Long.MIN_VALUE this.maxOutOfOrderness;}public abstract long extractTimestamp(T element);Overridepublic final Watermark getCurrentWatermark() {long potentialWM currentMaxTimestamp - maxOutOfOrderness;if (potentialWM lastEmittedWatermark) {lastEmittedWatermark potentialWM;}return new Watermark(lastEmittedWatermark);}Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp extractTimestamp(element);if (timestamp currentMaxTimestamp) {currentMaxTimestamp timestamp;}return timestamp;}BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的构造它时传入的参数maxOutOfOrderness就是乱序区间的长度而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。
当然乱序区间的长度要根据实际环境谨慎设定设定得太短会丢较多的数据设定得太长会导致窗口触发延迟实时性减弱。
设置maxOutOfOrderness为5min代码如下
source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTuple2String, Long(Time.minutes(5)) {Overridepublic long extractTimestamp(Tuple2String, Long element) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();运行结果如下 看起来和我们自定义实现结果一样。但是10min的水位时间是来自数据15min减去延迟时间5min得来的。
同理20min的水位时间是来自数据25min减去延迟时间5min得来的。
我们可以设置延迟时间为10min看一下结果。最后一条数据是25min那么最后的水位线就是25min-10min15min。只会触发00-10的窗口。 同样的由于没有后续数据导致后面的窗口没有触发。
AscendingTimestampExtractor
AscendingTimestampExtractor要求生成的时间戳和水印都是单调递增的。用户实现从数据中获取自增的时间戳extractAscendingTimestamp与上一次时间戳比较。如果出现减少则打印warn日志。
源码如下 public abstract long extractAscendingTimestamp(T element);Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp extractAscendingTimestamp(element);if (newTimestamp this.currentTimestamp) {this.currentTimestamp newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}间断性水位线
适用于根据接收到的消息判断是否需要产生水位线的情况用这种水印生成的方式并不多见。
举例如下数据为15min的时候生成水位。
source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarksTuple2String, Long() {NullableOverridepublic Watermark checkAndGetNextWatermark(Tuple2String, Long lastElement, long extractedTimestamp) {DateTime date DateUtil.date(lastElement.f1);return date.minute()15?new Watermark(lastElement.f1):null;}Overridepublic long extractTimestamp(Tuple2String, Long element, long previousElementTimestamp) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();结果如下
15min的数据生成了15min的水位只触发了00-10的窗口。 窗口处理迟到的数据
allowedLateness
Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说正常情况下窗口触发计算完成之后就会被销毁但是设定了允许延迟之后窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中并触发新的计算。当然窗口也是吃资源大户所以allowedLateness的值要适当。给个完整的代码示例如下。
source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTuple2String, Long(Time.minutes(5)) {Overridepublic long extractTimestamp(Tuple2String, Long element) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();side output
侧输出side output是Flink的分流机制。迟到数据本身可以当做特殊的流我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去再进行下一步处理比如存到外部存储或消息队列。代码如下。
// 侧输出的OutputTagOutputTagTuple2String, Long lateOutputTag new OutputTag(late_data_output_tag);SingleOutputStreamOperatorObject process source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTuple2String, Long(Time.minutes(5)) {Overridepublic long extractTimestamp(Tuple2String, Long element) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).sideOutputLateData(lateOutputTag).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间 DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间: DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间: DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间: DateUtil.date(element.f1)));}});//处理侧输出数据
// process.getSideOutput(lateOutputTag).addSink()最后的window不触发解决方法
自定义自增水位
周期性获取watermark时自定义增加水位
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Long() {private long currentTimestamp;NullableOverride// 生成watermarkpublic Watermark getCurrentWatermark() {currentTimestamp60000;return new Watermark(currentTimestamp);}Override//获取事件时间public long extractTimestamp(Tuple2String, Long element, long previousElementTimestamp) {if (element.f1currentTimestamp){currentTimestamp element.f1;}return element.f1;}})结果如下 自定义trigger
当watermark不能满足关窗条件时我们给注册一个晚于事件时间的处理时间定时器使它一定能达到关窗条件。
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class MyTrigger extends TriggerObject, TimeWindow {Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());ctx.registerProcessingTimeTimer(window.maxTimestamp() 30000L);return TriggerResult.CONTINUE;}}Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());return TriggerResult.FIRE;}Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (time window.maxTimestamp()) {ctx.deleteProcessingTimeTimer(window.maxTimestamp() 30000L);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());ctx.deleteProcessingTimeTimer(window.maxTimestamp() 30000L);}
}Test.java
参考链接
https://www.jianshu.com/p/c612e95a5028
https://blog.csdn.net/lixinkuan328/article/details/104129671
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/event-time/generating_watermarks/
https://cloud.tencent.com/developer/article/1573079
https://blog.csdn.net/m0_73707775/article/details/129560540?spm1001.2101.3001.6650.5utm_mediumdistribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3depth_1-utm_sourcedistribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3utm_relevant_index10