公司做网站效果怎么样,平面设计公司经营范围,设计一套网页要多少钱,荆门网站seo窗口原理与机制
图片链接#xff1a;https://blog.csdn.net/qq_35590459/article/details/132177154 数据流进入算子前#xff0c;被提交给WindowAssigner#xff0c;决定元素被放到哪个或哪些窗口#xff0c;同时可能会创建新窗口或者合并旧的窗口。每一个窗口都拥有一个…窗口原理与机制
图片链接https://blog.csdn.net/qq_35590459/article/details/132177154 数据流进入算子前被提交给WindowAssigner决定元素被放到哪个或哪些窗口同时可能会创建新窗口或者合并旧的窗口。每一个窗口都拥有一个属于自己的触发器Trigger每当有元素被分配到该窗口或者之前注册的定时器超时时Trigger都会被调用。Trigger被触发后窗口中的元素集合就会交给Evictor如果指定了遍历窗口中的元素列表并决定最先进入窗口的多少个元素需要被移除。窗口函数计算结果值发送给下游
Trigger 触发器
触发器作用控制窗口什么时候除法计算。即执行窗口函数基于WindowStream调用trigger方法传入自定义触发器trigger
每一个窗口分配器windowAssigner 都会对应一个默认的触发器 源码样例 PublicEvolvingpublic W extends Window WindowedStreamT, KEY, W window(WindowAssigner? super T, W assigner) {return new WindowedStream(this, assigner);}PublicEvolvingpublic WindowedStream(KeyedStreamT, K input, WindowAssigner? super T, W windowAssigner) {this.input input;this.builder new WindowOperatorBuilder(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}默认触发器
public TriggerObject, TimeWindow getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}
Triger类有4个方法 onElement:窗口中每来一个元素调用该方法。
onProcessingTime当注册的处理时间定时器触发时将调用这个方法。onEventTime当注时的事件时间定时器触发时将调用这个方法。clear窗口关闭冰销毁时调用这个方法一般用来清除自定义状态。onElement() onProcessingTime()onEventTime()方法的返回类型都是 TriggerResultTriggerResult中包含四个枚举值
CONTINUE表示对窗口不执行任何操作。
FIRE触发计算并输出结果。注意计算完成后窗口中的数据并不会被清除将会被保留。
PURGE表示将窗口中的数据和窗口清除。
FIRE_AND_PURGE表示先将数据进行计算输出结果然后将窗口中的数据和窗口进行清除。源码
/** No action is taken on the window. */
CONTINUE(false, false),
/** {code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
FIRE_AND_PURGE(true, true),
/*** On {code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/
FIRE(true, false),
/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/
PURGE(false, true);
flink提供的触发器
flink提供触发器
EventTimeTrigger通过对比EventTime和窗口的Endtime确定是否触发窗口计算如果EventTime大于Window EndTime则触发否则不触发窗口将继续等待。ProcessingTimeoutTrigger当内置触发器满足设置的超时时间时触发窗口的计算。ProcessTimeTrigger通过对比ProcessTime和窗口EndTme确定是否触发窗口如果ProcessTime大于EndTime则触发计算否则窗口继续等待。ContinuousEventTimeTrigger根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。ContinuousProcessingTimeTrigger根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。CountTrigger根据接入数据量是否超过设定的阙值判断是否触发窗口计算。DeltaTrigger根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。PurgingTrigger可以将任意触发器作为参数转换为Purge类型的触发器计算完成后数据将被清理。NeverTrigger任何时候都不触发窗口计算全局窗口触发器 原文链接https://blog.csdn.net/qq_37555071/article/details/122514061 水印触发一般是窗口关闭时间
flink提供的触发器是与窗口对应当有水印时如果水印时间大于等于窗口结束时间会触发计算window.maxTimestamp()获取的是窗口end-1; EventTimeTrigger 的源码可以很明确可以看到注册时注册了触发时间为window.maxTimestamp(),这也是窗口关闭的触发机制。
如果在窗口关闭前触发计算设置多个触发计算时间这样实现一些特定的需求。例如每10s输出一次当天的累计数据
public class EventTimeTrigger extends TriggerObject, TimeWindow {private static final long serialVersionUID 1L;private EventTimeTrigger() {}Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {// 限定触发条件为窗口关闭时间否则就继续窗口 return time window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}
..... 自定义触发器
继承Triger重写抽象方法案例
.window(TumblingEventTimeWindows.of(Time.hours(24))).trigger(new MyTrigger()).process(new WindowResult()).print();窗口长24小时每十秒触发一次计算
public static class MyTrigger extends TriggerEvent, TimeWindow {Overridepublic TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {//定义状态记录该状态 触发器第一个元素进来时注册全部的触发器ValueStateBoolean isFirstEvent triggerContext.getPartitionedState(new ValueStateDescriptorBoolean(first-event, Types.BOOLEAN));//第一次注册右面全部跳过if (isFirstEvent.value() null) {for (long i timeWindow.getStart(); i timeWindow.getEnd(); i i 10000L) {//注册触发器 间隔10striggerContext.registerEventTimeTimer(i);}isFirstEvent.update(true);}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {//使用的事件时间因此触发窗口的计算return TriggerResult.FIRE;}Overridepublic TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}Overridepublic void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {ValueStateBoolean isFirstEvent triggerContext.getPartitionedState(new ValueStateDescriptorBoolean(first-event, Types.BOOLEAN));isFirstEvent.clear();}} 移除器Evictor
作用主要用来定义移除某些数据的逻辑。基于windowedStream调用evictor()方法就可以传入一个自定义得移除器(Evictor)。不同窗口类型都有各自预测实现的移除器。
stream.keyby().window().evictor(new MyEvictor)
evictBefore():定义窗口执行函数之前移除的数据操作移除后的数据不参与窗口计算
evictAfter():定义执行窗口函数后移除数据的操作
默认情况下预实现的移出弃都是在执行窗口函数之前移除数据
flink 提供的移除器
CountEvictor: 仅记录用户指定数量的元素一旦窗口中的元素超过这个数量多余的元素会从窗口缓存的开头移除 CountEvictor在countwindow中有明确定义引用。 DeltaEvictor: 接收 DeltaFunction 和 threshold 参数计算最后一个元素与窗口缓存中所有元素的差值 并移除差值大于或等于 threshold 的元素。暂时不清楚作用 TimeEvictor: 接受窗口inteval时间它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - inteval小的所有元素。TimeEvictor.of() 方法来构建 inteval 不是窗口时间如果为0窗口没有数据输出
//TimeEvictor 部分源码 Overridepublic void evictBefore(IterableTimestampedValueObject elements, int size, W window, EvictorContext ctx) {if (!doEvictAfter) {evict(elements, size, ctx);}}Overridepublic void evictAfter(IterableTimestampedValueObject elements, int size, W window, EvictorContext ctx) {if (doEvictAfter) {evict(elements, size, ctx);}}private void evict(IterableTimestampedValueObject elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}long currentTime getMaxTimestamp(elements);long evictCutoff currentTime - windowSize;//移除时间窗口时间之前的数据注意获取的并不是窗口end时间for (IteratorTimestampedValueObject iterator elements.iterator();iterator.hasNext(); ) {TimestampedValueObject record iterator.next();if (record.getTimestamp() evictCutoff) {iterator.remove();}}}
// 获取当前元素中最大的时间private long getMaxTimestamp(IterableTimestampedValueObject elements) {long currentTime Long.MIN_VALUE;for (IteratorTimestampedValueObject iterator elements.iterator();iterator.hasNext(); ) {TimestampedValueObject record iterator.next();currentTime Math.max(currentTime, record.getTimestamp());}return currentTime;}// 保留多长时间的数据public static W extends Window TimeEvictorW of(Time windowSize) {return new TimeEvictor(windowSize.toMilliseconds());}/*** Creates a {code TimeEvictor} that keeps the given number of elements. Eviction is done* before/after the window function based on the value of doEvictAfter.** param windowSize The amount of time for which to keep elements.* param doEvictAfter Whether eviction is done after window function.*/public static W extends Window TimeEvictorW of(Time windowSize, boolean doEvictAfter) {return new TimeEvictor(windowSize.toMilliseconds(), doEvictAfter);}
例如
stream.keyBy(r - r.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.evictor(TimeEvictor.of(Time.seconds(3))) // 只输出窗口关闭前3s的数据
.process( new WindowResult())
.print();注意如果在evict中使用了iterable.iterator(),后面再次使用时不能直接使用 .keyBy(r - r.user).window(TumblingEventTimeWindows.of(Time.seconds(10)));window.evictor(new EvictorEvent, TimeWindow() {Overridepublic void evictBefore(IterableTimestampedValueEvent elements, int size, TimeWindow window, EvictorContext evictorContext) {IteratorTimestampedValueEvent iterator elements.iterator();while (iterator.hasNext()){TimestampedValueEvent next iterator.next();if(next.getValue().url.equals(./prod?id1)){iterator.remove();}}}Overridepublic void evictAfter(IterableTimestampedValueEvent elements, int size, TimeWindow window, EvictorContext evictorContext) {return;}}).process(new ProcessWindowFunctionEvent, UrlViewCount, String, TimeWindow() {Overridepublic void process(String s, ProcessWindowFunctionEvent, UrlViewCount, String, TimeWindow.Context context, IterableEvent elements, CollectorUrlViewCount out) throws Exception {AtomicInteger i new AtomicInteger();elements.forEach(v- i.getAndIncrement());out.collect(new UrlViewCount(s,// 获取迭代器中的元素个数 不能再使用iterable.spliterator().getExactSizeIfKnown(),否侧获取数据一一直为-1i.longValue(),context.window().getStart(),context.window().getEnd()));} }).print();