一键抓取的网站怎么做,房产资讯什么网站做的好,网站开发swf素材,云服务器可以做网站吗在Flink中#xff0c;处理时间序列数据时#xff0c;通常需要考虑事件时间和水印#xff08;watermarks#xff09;的处理。以下是修改前后的代码对比分析#xff1a;
修改前的代码#xff1a;
val systemDS unitDS.map(dp {dp.setDeviceCode(DeviceCodeEnum.fro…在Flink中处理时间序列数据时通常需要考虑事件时间和水印watermarks的处理。以下是修改前后的代码对比分析
修改前的代码
val systemDS unitDS.map(dp {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
unitDS 经过一个 map 操作将每个元素的 deviceCode 转换为系统设备码。使用 keyBy(_.getDeviceCode) 对转换后的设备码进行分组。定义了一个基于事件时间的滚动窗口窗口大小为60秒。使用 process 操作应用自定义的窗口函数 HPageSystemWinF 来处理每个窗口中的数据。
注意修改前的代码没有显示地处理水印watermarks这可能导致在处理乱序数据或延迟数据时出现问题。
修改后的代码
val systemDS unitDS.map(dp {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.assignTimestampsAndWatermarks(WatermarkStrategy.boundedOutOfOrdernessDaysPowerforBoundedOutOfOrderness(Duration.ofSeconds(5)) // 假设这里应该是.forBoundedOutOfOrderness而不是.forBoundedOutOfOrdernessDaysPower.withIdleness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner[DaysPower] {override def extractTimestamp(element: DaysPower, recordTimestamp: Long): Long {Math.max(element.getEventTime, recordTimestamp)}})
).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
与修改前相同的部分map, keyBy, 和 window 操作。添加了 assignTimestampsAndWatermarks 方法来处理事件时间和水印 使用 WatermarkStrategy.forBoundedOutOfOrderness 允许一定程度的乱序数据这里是5秒。.withIdleness(Duration.ofSeconds(5)) 设置了空闲超时时间为5秒用于处理不活跃的键。使用 withTimestampAssigner 自定义了时间戳分配器确保使用的事件时间是元素中的 eventTime 和记录的 recordTimestamp 中的较大值。
不同点和适用场景
事件时间和水印处理修改后的代码显式地处理了事件时间和水印这对于处理乱序数据、延迟数据以及确保正确的时间窗口计算是非常重要的。如果您的数据流中存在乱序或延迟数据或者您希望更严格地保证处理时间窗口的正确性那么应该使用修改后的代码。空闲超时通过设置空闲超时可以处理那些长时间不活跃的键避免因为某些键长时间没有新数据而导致整个程序挂起。延迟数据处理如果数据有可能晚到但仍然需要被纳入正确的窗口进行计算水印可以帮助界定数据的“迟到”界限。精确的时间窗口分析对于需要基于事件实际发生时间而非数据处理时间进行分析的场景如实时监控、金融交易分析等事件时间模型是必须的。