当前位置: 首页 > news >正文

建站推广哪里有建站新闻资讯域名备案的网站建设方案书模板

建站推广哪里有建站新闻资讯,域名备案的网站建设方案书模板,网页设计教材,最近高清免费资源窗口Window机制 窗口概述窗口的分类是否按键分区按键分区窗口非按键分区 按照驱动类型按具体分配规则滚动窗口Tumbling Windows滑动窗口 Sliding Windows会话窗口 Session Windows全局窗口 Global Windows 时间语义窗口分配器 Window Assigners时间窗口计数窗口例子 窗口函数 W… 窗口Window机制 窗口概述窗口的分类是否按键分区按键分区窗口非按键分区 按照驱动类型按具体分配规则滚动窗口Tumbling Windows滑动窗口 Sliding Windows会话窗口 Session Windows全局窗口 Global Windows 时间语义窗口分配器 Window Assigners时间窗口计数窗口例子 窗口函数 Window Functions增量聚合函数ReduceFunctionAggregateFunction 全窗/全量口函数WindowFunctionProcessWindowFunction增量聚合和全窗口函数的结合 其他触发器 Trigger移除器 Evictor 窗口概述 在大多数场景下需要统计的数据流都是无界的因此无法等待整个数据流终止后才进行统计。通常情况下只需要对某个时间范围或者数量范围内的数据进行统计分析 例如 每隔10分钟统计一次过去30分钟内某个对象的点击量每发生100次点击后就去统计一下每个对象点击率的占比因此在Apache Flink中窗口是对无界数据流进行有界处理的机制。窗口可以将无限的数据流划分为有限的、可处理的块使得可以基于这些有限的数据块执行聚合、计算和分析操作。 窗口的分类 是否按键分区 在定义窗口操作之前首先需要确定到底是基于按键分区的数据流KeyedStream来开窗还是直接在没有按键分区的DataStream上开窗。 两者区别 1.keyed streams要调用keyBy(...)后再调用window(...) 而non-keyed streams只用直接调用windowAll(...)2.对于keyed stream其中数据的任何属性都可以作为key。 允许窗口计算由多个task并行因为每个逻辑上的 keyed stream都可以被单独处理。 属于同一个key的元素会被发送到同一个 task。3.对于non-keyed stream原始的stream不会被分割为多个逻辑上的stream 所有的窗口计算会被同一个 task完成也就是parallelism为1按键分区窗口 经过按键分区keyBy操作后数据流会按照key被分为多条逻辑流这就是KeyedStream。基于KeyedStream进行窗口操作时窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务而窗口操作会基于每个key进行单独的处理。所以可以认为每个key上都定义了一组窗口各自独立地进行统计计算。按键分区窗口写法 stream.keyBy(...) - 仅 keyed 窗口需要.window(...) - 必填项assigner[.trigger(...)] - 可选项trigger (省略则使用默认 trigger)[.evictor(...)] - 可选项evictor (省略则不使用 evictor)[.allowedLateness(...)] - 可选项lateness (省略则为 0)[.sideOutputLateData(...)] - 可选项output tag (省略则不对迟到数据使用 side output).reduce/aggregate/apply() - 必填项function[.getSideOutput(...)] - 可选项output tag代码示例 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(node01, 8086);// 将输入数据转换为(key, value)元组DataStreamTuple2String, Integer dataStream source.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2 map(String s) throws Exception {int number Integer.parseInt(s);String key number % 2 0 ? key1 : key2;Tuple2 tuple2 new Tuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING, Types.INT));// keyBy操作KeyedStreamTuple2String, Integer, String keyBy dataStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}});// 每10秒统计一次数量和SingleOutputStreamOperatorTuple2String, Integer streamOperator keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(1);streamOperator.print();env.execute();}发送测试数据 [rootadministrator ~]# nc -lk 8086 1 2 3 4等待10秒后,控制台打印如下 (key2,4) (key1,6)非按键分区 如果没有进行keyBy那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务task上执行就相当于并行度变成了1。 非按键分区窗口写法 stream.windowAll(...) - 必填项assigner[.trigger(...)] - 可选项trigger (else default trigger)[.evictor(...)] - 可选项evictor (else no evictor)[.allowedLateness(...)] - 可选项lateness (else zero)[.sideOutputLateData(...)] - 可选项output tag (else no side output for late data).reduce/aggregate/apply() - 必填项function[.getSideOutput(...)] - 可选项output tag代码示例 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(node01, 8086);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 每10秒统计一次数量和SingleOutputStreamOperatorInteger streamOperator dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(0);streamOperator.print();env.execute();}按照驱动类型 窗口按照驱动类型可以分成时间窗口和计数窗口这两种窗口类型根据其触发机制和边界规则的不同适用于不同的应用场景。 时间窗口 Time Windows 时间窗口根据事件时间Event Time或处理时间Processing Time来划分时间窗口根据时间的进展划分数据流当一个窗口的时间到达或窗口中的元素数量达到阈值时触发窗口计算计数窗口 Count Windows 计数窗口根据元素的数量或元素的增量来划分计数窗口在数据流中累积固定数量的元素后触发窗口计算窗口的大小可以是固定的也可以是动态变化的取决于所设置的阈值和策略按具体分配规则 窗口按照具体的分配规则又有滚动窗口Tumbling Window、滑动窗口Sliding Window、会话窗口Session Window以及全局窗口Global Window。 滚动窗口Tumbling Windows 滚动窗口将数据流划分为固定大小的、不重叠的窗口。 例如将数据流按照5秒的滚动窗口大小进行划分每个窗口包含5秒的数据。那么每5秒就会有一个窗口被计算且一个新的窗口被创建 代码示例 DataStreamT input ...;// 滚动 event-time 窗口 input.keyBy(key selector)// 间间隔可以用 Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x) 等来指定.window(TumblingEventTimeWindows.of(Time.seconds(5))).windowed transformation(window function);// 滚动 processing-time 窗口 input.keyBy(key selector).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).windowed transformation(window function);// 长度为一天的滚动 event-time 窗口 偏移量为 -8 小时。 input.keyBy(key selector).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).windowed transformation(window function);滑动窗口 Sliding Windows 滑动窗口将数据流划分为固定大小的窗口窗口大小通过window size参数设置需要一个额外的滑动距离window slide参数来控制生成新窗口的频率。 如果slide小于窗口大小滑动窗口可以允许窗口重叠。这种情况下一个元素可能会被分发到多个窗口。 例如将数据流按照5秒的滑动窗口大小和3秒的滑动步长进行划分窗口之间有2秒的重叠。 DataStreamT input ...;// 滑动 event-time 窗口 input.keyBy(key selector).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).windowed transformation(window function);// 滑动 processing-time 窗口 input.keyBy(key selector).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).windowed transformation(window function);// 滑动 processing-time 窗口偏移量为 -8 小时 input.keyBy(key selector).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).windowed transformation(window function);会话窗口 Session Windows 与滚动窗口和滑动窗口不同会话窗口不会相互重叠且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭即在一段不活跃的间隔之后。 如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔) 可以配置静态的gap, 也可以通过一个gap extractor函数来定义gap的长度 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口 DataStreamT input ...;// 设置了固定间隔的 event-time 会话窗口 input.keyBy(key selector).window(EventTimeSessionWindows.withGap(Time.minutes(10))).windowed transformation(window function);// 设置了动态间隔的 event-time 会话窗口 input.keyBy(key selector).window(EventTimeSessionWindows.withDynamicGap((element) - {// 决定并返回会话间隔})).windowed transformation(window function);// 设置了固定间隔的 processing-time session 窗口 input.keyBy(key selector).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).windowed transformation(window function);// 设置了动态间隔的 processing-time 会话窗口 input.keyBy(key selector).window(ProcessingTimeSessionWindows.withDynamicGap((element) - {// 决定并返回会话间隔})).windowed transformation(window function);全局窗口 Global Windows 全局窗口将整个数据流作为一个窗口进行处理不进行分割。全局窗口适用于需要在整个数据流上执行聚合操作的场景。 DataStreamT input ...;input.keyBy(key selector).window(GlobalWindows.create()).windowed transformation(window function);时间语义 在Flink的流式操作中, 会涉及不同的时间概念即时间语义它是指在数据处理中确定事件的时间基准的机制。 在实时数据流处理中常见的时间语义有以下三种 1.处理时间Processing Time 处理时间是指数据处理引擎的本地时钟时间也称为机器时间或系统时间使用处理时间时事件的时间顺序是根据数据到达处理引擎的顺序来确定的处理时间是一种简单和实时性较高的时间语义但不考虑数据可能存在的延迟或乱序2.事件时间Event Time 事件时间是数据流中记录的实际时间通常是数据本身携带的时间戳使用事件时间时数据记录的时间戳决定事件在时间轴上的顺序而不受数据到达引擎的顺序影响事件时间是一种准确和可重现的时间语义能够处理延迟和乱序数据但可能需要关注水印的处理3.摄取时间Ingestion Time 注意较新版本的Flink已经弃用推荐使用事件时间 摄取时间是数据进入数据处理引擎的时间使用摄取时间时数据到达引擎的顺序决定事件的时间顺序摄取时间是介于处理时间和事件时间之间的折中方案。它可以处理一定程度的延迟和乱序数据但不会像事件时间那样需要处理水印。区别 处理时间适用于实时性要求较高、不关心事件的顺序和时间戳的场景事件时间适用于需要准确处理事件顺序和考虑延迟、乱序数据的场景摄取时间提供了某种程度上的准确性和实时性折中窗口分配器 Window Assigners 在Apache Flink中窗口分配器Window Assigner用于定义如何将数据流中的元素分配到窗口。窗口分配器确定了窗口的边界以及如何对元素进行分组和分配 窗口分配器最通用的定义方式: 如果是按键分区窗口, 直接调用.keyBy().window()方法,传入一个WindowAssigner作为参数返回WindowedStream。如果是非按键分区窗口直接调用.windowAll()方法传入一个WindowAssigner返回的是AllWindowedStream。时间窗口 时间窗口是最常用的窗口类型可以大致细分为滚动、滑动和会话三种。 1.滚动处理时间窗口 窗口分配器由类TumblingProcessingTimeWindows提供需要调用它的静态方法.of()需要传入一个Time类型的参数size表示滚动窗口的大小 // 非按键分区 滚动事件时间窗口窗口长度10s。每10秒操作一次 dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 按键分区 dataStream.keyBy().window(TumblingProcessingTimeWindows.of(Time.seconds(10)));2.滚动事件时间窗口 窗口分配器由类TumblingEventTimeWindows提供用法与滚动处理事件窗口完全一致。 dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));3.滑动处理时间窗口 窗口分配器由类SlidingProcessingTimeWindows提供同样需要调用它的静态方法.of()需要传入两个Time类型的参数size和slide前者表示滑动窗口的大小后者表示滑动窗口的滑动步长 // 窗口长度10s滑动步长2s。 每2秒滑动一次窗口大小为10秒的滑动时间窗口并对窗口中的元素进行操作。 dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));4.滑动事件时间窗口 窗口分配器由类SlidingEventTimeWindows提供用法与滑动处理事件窗口完全一致 dataStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(10)Time.seconds(5)));5.处理时间会话窗口 窗口分配器由类ProcessingTimeSessionWindows提供需要调用它的静态方法withGap()或者withDynamicGap()。需要传入一个Time类型的参数size表示会话的超时时间 // 会话窗口超时间隔5s dataStream.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));6.事件时间会话窗口 窗口分配器由类EventTimeSessionWindows提供用法与处理事件会话窗口完全一致。 dataStream.windowAll(EventTimeSessionWindows.withGap(Time.seconds(10)));计数窗口 1.滚动计数窗口 滚动计数窗口只需要传入一个长整型的参数size表示窗口的大小。 当窗口中元素数量达到size时就会触发计算执行并关闭窗口。 // 滚动窗口窗口长度2个元素 dataStream.countWindowAll(2);2.滑动计数窗口 在countWindow()调用时传入两个参数size和slide前者表示窗口大小后者表示滑动步长。 每个窗口统计size个数据每隔slide个数据就统计输出一次结果。 // 滑动窗口窗口长度2个元素滑动步长2个元素 dataStream.countWindowAll(5,2);3.全局窗口 全局窗口是计数窗口的底层实现一般在需要自定义窗口时使用。它的定义同样是直接调用.window()分配器由GlobalWindows类提供。 // 全局窗口需要自定义的时候才会用 dataStream.windowAll(GlobalWindows.create());dataStream.keyBy().window(GlobalWindows.create());注意使用全局窗口必须自行定义触发器才能实现窗口计算否则不起作用。 例子 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(node01, 8086);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 时间窗口示例滚动处理时间窗口窗口长度10s。 每10秒统计一次数量和SingleOutputStreamOperatorInteger streamOperator dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(0);streamOperator.print();env.execute();}发送测试数据 [rootadministrator ~]# nc -lk 8086 1 2 3 4等待10秒后,控制台打印如下 10窗口函数 Window Functions 定义了window assigner之后需要指定当窗口触发之后如何计算每个窗口中的数据 这就是window function的职责 窗口函数是在窗口操作中应用于窗口中元素的函数。Flink提供了丰富的窗口函数用于对窗口中的元素进行各种操作和计算。 根据处理的方式可以分为两类增量聚合函数和全窗/全量口函数它们是Flink中用于窗口计算的两种不同的函数。 增量聚合函数 增量聚合函数是指对窗口中的数据进行累积计算的函数。它会在每个元素到达窗口时进行计算并且仅保留窗口计算所需的中间状态。这种方式可以显著提高计算性能尤其适用于大规模数据和长窗口的情况。 对于增量聚合函数Flink 提供了一系列内置的聚合函数例如 sum、min、max、avg等它们的底层其实都是通过AggregateFunction来实现的。还可以通过实现 AggregateFunction接口来定义自定义的增量聚合函数。 典型的增量聚合函数有两个ReduceFunction和AggregateFunction。 ReduceFunction ReduceFunction指定两条输入数据如何合并起来产生一条输出数据输入和输出数据的类型必须相同。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(node01, 8086);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 指定窗口分配器AllWindowedStreamInteger, TimeWindow allWindowedStream dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 指定窗口函数使用 增量聚合ReduceSingleOutputStreamOperatorInteger reduce allWindowedStream.reduce(new ReduceFunctionInteger() {Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {System.out.println(前一个值 value1 ,后一个值 value2);return value1 value2;}});// 在窗口触发的时候才会输出窗口的最终计算结果reduce.print();env.execute();}发送测试数据 [rootadministrator ~]# nc -lk 8086 1 2 3 4 5控制台输出 前一个值 1 ,后一个值2 前一个值 3 ,后一个值3 前一个值 6 ,后一个值4 前一个值 10 ,后一个值5 15AggregateFunction ReduceFunction接口存在一个限制聚合状态的类型、输出结果的类型都必须和输入数据类型一样。聚合函数则突破了这个限制可以定义更加灵活的窗口聚合操作。 AggregateFunction函数接口方法参数有三种类型输入类型IN、累加器类型ACC和输出类型OUT。 输入类型IN就是输入流中元素的数据类型累加器类型ACC则是我们进行聚合的中间状态类型而输出类型当然就是最终计算结果的类型接口中有四个方法 createAccumulator()创建一个累加器这就是为聚合创建了一个初始状态每个聚合任务只会调用一次add()将输入的元素添加到累加器中getResult()从累加器中提取聚合的输出结果merge()合并两个累加器并将合并后的状态作为一个累加器返回与ReduceFunction相同AggregateFunction也是增量式的聚合而由于输入、中间状态、输出的类型可以不同使得应用更加灵活方便。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(node01, 8086);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 指定窗口分配器AllWindowedStreamInteger, TimeWindow allWindowedStream dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 窗口函数 增量聚合 AggregateSingleOutputStreamOperatorString aggregate allWindowedStream.aggregate(new MyAggregateFunction());aggregate.print();env.execute();}/*** 第一个类型 输入数据的类型* 第二个类型 累加器的类型存储的中间计算结果的类型* 第三个类型 输出的类型*/public static class MyAggregateFunction implements AggregateFunctionInteger, Integer, String {/*** 创建累加器初始化累加器** return*/Overridepublic Integer createAccumulator() {System.out.println(createAccumulator方法执行);return 0;}/*** 聚合逻辑* 来一条计算一条调用一次add方法** param value 当前值* param accumulator 累加器的值* return*/Overridepublic Integer add(Integer value, Integer accumulator) {System.out.println(add方法执行当前值 value 累加器值 accumulator);return value accumulator;}/*** 获取最终结果窗口触发时输出** param accumulator* return*/Overridepublic String getResult(Integer accumulator) {System.out.println(getResult方法执行);return 最终计算值 accumulator;}Overridepublic Integer merge(Integer a, Integer b) {// 只有会话窗口才会用到System.out.println(merge方法执行);return null;}}发送测试数据 [rootadministrator ~]# nc -lk 8086 1 2 3 4 5控制台输出 createAccumulator方法执行 add方法执行当前值 1累加器值 0 add方法执行当前值 2累加器值 1 add方法执行当前值 3累加器值 3 add方法执行当前值 4累加器值 6 add方法执行当前值 5累加器值 10 getResult方法执行 最终计算值15全窗/全量口函数 全窗口函数是对窗口中的所有元素进行计算的函数。它会在窗口触发时对窗口中的所有元素进行处理并输出一个或多个结果。全窗口函数可以访问窗口的所有元素并且可以使用窗口中的状态信息。 对于全窗口函数Flink提供了 ProcessWindowFunction 和 WindowFunction 两个接口供用户使用。 ProcessWindowFunction: 可以处理每个元素并输出零个、一个或多个结果WindowFunction: 是一个转换函数对窗口的所有元素进行转换并输出一个或多个结果。与增量聚合函数不同全窗口函数需要先收集窗口中的数据并在内部缓存起来等到窗口要输出结果的时候再取出数据进行计算。 WindowFunction public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(node01, 8086);// 将输入数据转换为(key, value)元组DataStreamTuple2String, Integer dataStream source.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2 map(String s) throws Exception {int number Integer.parseInt(s);String key number % 2 0 ? key1 : key2;Tuple2 tuple2 new Tuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING, Types.INT));// keyBy操作KeyedStreamTuple2String, Integer, String keyedStream dataStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}});// 指定窗口分配器 非键分区窗口 // AllWindowedStreamInteger, TimeWindow allWindowedStream dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 键分区窗口WindowedStreamTuple2String, Integer, String, TimeWindow windowedStream keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 使用WindowFunction窗口函数SingleOutputStreamOperatorString apply windowedStream.apply(new MyWindowFunction());apply.print();env.execute();}/*** 窗口函数* p* 窗口触发时才会调用一次统一计算窗口的所有数据*/public static class MyWindowFunction implements WindowFunctionTuple2String, Integer, String, String, TimeWindow {/*** param s 分组的key,非键分区窗口则无该参数* param window 窗口对象* param input 存的数据* param out 采集器*/Overridepublic void apply(String s, TimeWindow window, IterableTuple2String, Integer input, CollectorString out) throws Exception {// 上下文拿到window对象获取相关信息long start window.getStart();long end window.getEnd();String windowStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss);String windowEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss);long count input.spliterator().estimateSize();out.collect(分组 s 的窗口,在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}}]# nc -lk 8086 1 2 3 4 5分组 key2 的窗口,在时间区间 2023-06-27 16:50:10-2023-06-27 16:50:20 产生3条数据,具体数据[(key2,1), (key2,3), (key2,5)] 分组 key1 的窗口,在时间区间 2023-06-27 16:50:10-2023-06-27 16:50:20 产生2条数据,具体数据[(key1,2), (key1,4)]ProcessWindowFunction // 使用ProcessWindowFunction处理窗口函数SingleOutputStreamOperatorString process windowedStream.process(new MyProcessWindowFunction()); /*** 处理窗口函数* p* 窗口触发时才会调用一次统一计算窗口的所有数据*/public static class MyProcessWindowFunction extends ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow {/*** param s 分组的key,非键分区窗口则无该参数* param context 上下文* param input 存的数据* param out 采集器* throws Exception*/Overridepublic void process(String s, Context context, IterableTuple2String, Integer input, CollectorString out) throws Exception {// 上下文拿到window对象获取相关信息long start context.window().getStart();long end context.window().getEnd();String windowStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss);String windowEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss);long count input.spliterator().estimateSize();out.collect(分组 s 的窗口,在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}}增量聚合和全窗口函数的结合 在调用窗口的增量聚合函数方法时第一个参数直接传入一个ReduceFunction或AggregateFunction进行增量聚合第二个参数传入一个全窗口函数WindowFunction或者ProcessWindowFunction。 基于第一个参数增量聚合函数来处理窗口数据每来一个数据就做一次聚合等到窗口需要触发计算时则调用第二个参数全窗口函数的处理逻辑输出结果注意这里的全窗口函数就不再缓存所有数据了而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(node01, 8086);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 指定窗口分配器 非键分区窗口AllWindowedStreamInteger, TimeWindow allWindowedStream dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 使用ProcessWindowFunction处理窗口函数SingleOutputStreamOperatorString process allWindowedStream.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());process.print();env.execute();}/*** 第一个类型 输入数据的类型* 第二个类型 累加器的类型存储的中间计算结果的类型* 第三个类型 输出的类型*/public static class MyAggregateFunction implements AggregateFunctionInteger, Integer, String {/*** 创建累加器初始化累加器** return*/Overridepublic Integer createAccumulator() {System.out.println(createAccumulator方法执行);return 0;}/*** 聚合逻辑* 来一条计算一条调用一次add方法** param value 当前值* param accumulator 累加器的值* return*/Overridepublic Integer add(Integer value, Integer accumulator) {System.out.println(add方法执行当前值 value 累加器值 accumulator);return value accumulator;}/*** 获取最终结果窗口触发时输出** param accumulator* return*/Overridepublic String getResult(Integer accumulator) {System.out.println(getResult方法执行);return 最终计算值 accumulator;}Overridepublic Integer merge(Integer a, Integer b) {// 只有会话窗口才会用到System.out.println(merge方法执行);return null;}}/*** 处理窗口函数* p* 窗口触发时才会调用一次统一计算窗口的所有数据* p* 注意增量聚合函数的输出类型 是 全窗口函数的输入类型*/public static class MyProcessWindowFunction extends ProcessAllWindowFunctionString, String, TimeWindow {/*** param context 上下文* param input 存的数据* param out 采集器* throws Exception*/Overridepublic void process(Context context, IterableString input, CollectorString out) throws Exception {// 上下文拿到window对象获取相关信息long start context.window().getStart();long end context.window().getEnd();String windowStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss);String windowEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss);long count input.spliterator().estimateSize();out.collect(窗口在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}}createAccumulator方法执行 add方法执行当前值 1 累加器值 0 add方法执行当前值 2 累加器值 1 add方法执行当前值 3 累加器值 3 add方法执行当前值 4 累加器值 6 add方法执行当前值 5 累加器值 10 getResult方法执行 窗口在时间区间 2023-06-27 17:07:50-2023-06-27 17:08:00 产生1条数据,具体数据[最终计算值15]其他 触发器 Trigger Trigger决定了一个窗口由windowassigner定义何时可以被windowfunction处理。每个WindowAssigner都有一个默认的Trigger。如果默认trigger无法满足需要可以在trigger(…)调用中指定自定义的trigger。 Trigger接口提供了五个方法来响应不同的事件 onElement()方法在每个元素被加入窗口时调用onEventTime()方法在注册的event-timetimer触发时调用onProcessingTime()方法在注册的processing-timetimer触发时调用onMerge()方法与有状态的trigger相关。该方法会在两个窗口合并时将窗口对应trigger的状态进行合并比如使用会话窗口时clear()方法处理在对应窗口被移除时所需的逻辑注意 前三个方法通过返回TriggerResult来决定trigger如何应对到达窗口的事件。 应对方案 CONTINUE: 什么也不做FIRE: 触发计算PURGE: 清空窗口内的元素FIRE_AND_PURGE: 触发计算计算结束后清空窗口内的元素内置触发器 EventTimeTrigger基于事件时间和watermark机制来对窗口进行触发计算ProcessingTimeTrigger 基于处理时间触发CountTrigger窗口元素数超过预先给定的限制值的话会触发计算PurgingTrigger作为其它trigger的参数将其转化为一个purging触发器基于WindowedStream调用.trigger()方法就可以传入一个自定义的窗口触发器 stream.keyBy(...).window(...).trigger(new MyTrigger())移除器 Evictor Evictor可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。Evictor是一个接口不同的窗口类型都有各自预实现的移除器。 内置evictor 默认情况下所有内置的 evictor 逻辑都在调用窗口函数前执行。 CountEvictor: 仅记录用户指定数量的元素一旦窗口中的元素超过这个数量多余的元素会从窗口缓存的开头移除DeltaEvictor: 接收 DeltaFunction 和 threshold 参数计算最后一个元素与窗口缓存中所有元素的差值 并移除差值大于或等于 threshold 的元素。TimeEvictor: 接收 interval 参数以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素基于WindowedStream调用.evictor()方法就可以传入一个自定义的移除器 stream.keyBy(...).window(...).evictor(new MyEvictor())
http://www.hkea.cn/news/14266869/

相关文章:

  • 高端网站哪个比较好建设购物网站多少钱
  • wordpress如何建站呢wordpress邮件分析插件
  • 做网站开发教程社保门户网站建设方案
  • 房屋出租网站模板深圳市升华建设有限公司网站
  • 网站开发课程知识点总结泉州建站模板
  • 10g空间网站做视频网站网站的发展前景
  • 怎样做网站吸引人软件上传到那个网站做宣传
  • 自己做挖矿网站平面广告设计公司
  • 连云港做网站公司哪家好顺德网站建设原创
  • 2008 访问网站提示建设中更改wordpress主题语言包
  • 网站开发的pc或移动端山东省建设监理协会官方网站
  • 网站开发制作费用数码网站名
  • 龙海网站建设价格wordpress页面上分页
  • 海外网站哪个最好西安高校定制网站建设
  • 网站网站设计公司网站开发的比较
  • 商丘做网站sqlongliqi中国商铺网
  • 有人做网站吗wordpress文章上的图片显示不
  • 威海自适应网站建设建设通下载
  • 公司网站建设征稿令电商扶贫网站建设
  • 怎么弄一个网站python做网站项目
  • 网站做数据监测威海外贸网站建设多少钱
  • 全球网站访问量排名网站建设及解析流程
  • 建设商城网站的难点做视频网站广告收费
  • 网站建设创业书品牌建设对企业发展的重要性
  • 网页做得好的网站wordpress 二次元插件
  • 广州网站(建设信科网络)深圳工程建设交易中心网
  • 芜湖酒店网站建设wordpress科技网站模板
  • 做ppt的模板网站有哪些深圳网站建设找智恒网络
  • 做包装找灵感看什么网站淘宝图片做链接的网站
  • 网站模板修改教程网站建设 外文文献