企业网站一定要花钱吗,长沙seo推广营销,微网站是什么,好的产品设计网站目录
一、引言
二、为什么需要 Window
三、Window 的控制属性
窗口的长度#xff08;大小#xff09;
窗口的间隔
四、Flink 窗口应用代码结构
是否分组
Keyed Window --键控窗
Non-Keyed Window
核心操作流程
五、Window 的生命周期
分配阶段
触发计算
六、Wi…目录
一、引言
二、为什么需要 Window
三、Window 的控制属性
窗口的长度大小
窗口的间隔
四、Flink 窗口应用代码结构
是否分组
Keyed Window --键控窗
Non-Keyed Window
核心操作流程
五、Window 的生命周期
分配阶段
触发计算
六、Window 的分类
滚动窗口- TumblingWindow概念
滑动窗口– SlidingWindow概念 会话窗口 [了解]
七、Windows Function 窗口函数
分类剖析
增量聚合函数以 AggregateFunction 为例
全量聚合函数
八、案例实战
案例一
滚动窗口演示
滑动窗口演示
热词统计案例
kafka发送消息的模板代码
九、总结 本文深入探讨 Flink 中高级 API 里窗口Window的相关知识涵盖为什么需要窗口、其控制属性、应用代码结构、生命周期、分类以及窗口函数的各类细节并辅以实例进行讲解旨在助力开发者透彻理解并熟练运用 Flink 的窗口机制处理流数据。 一、引言 在大数据实时处理领域Apache Flink 凭借其卓越性能与丰富功能占据重要地位。而窗口Window作为 Flink 从流处理Streaming到批处理Batch的关键桥梁理解与掌握其使用对高效数据处理意义非凡接下来将全方位剖析其奥秘。 二、为什么需要 Window 在流处理场景中数据如潺潺溪流般持续涌入、无休无止。但诸多业务场景要求我们对特定时段数据做聚合操作像统计 “过去的 1 分钟内有多少用户点击了我们的网页”。若不划定范围面对无尽数据洪流根本无法开展有针对性计算。窗口恰似神奇 “箩筐”按规则收集一定时长或一定数据量数据将无限流拆分成有限 “桶”便于精准计算满足如 “每隔 10min计算最近 24h 的热搜词” 这类实时需求。 三、Window 的控制属性 窗口的长度大小 明确要计算最近多久的数据以时间维度举例若关注 24 小时内热搜词数据量那 24 小时即窗口长度计数维度下设定统计前 N 条数据N 就是计数窗口的长度规格。 窗口的间隔 决定隔多久进行一次计算操作。像 “每隔 10min计算最近 24h 的热搜词” 里每隔 10 分钟便是间隔设定它把控着计算频次节奏。 四、Flink 窗口应用代码结构 是否分组 首先要判定是否依 Key 对 DataStream 分组经 keyBy 操作后数据流成多组下游算子多实例可并行跑提效显著若用 windowAll 则不分组所有数据送下游单个实例并行度为 1后续窗口操作逻辑与分组情形Keyed Window类似仅执行主体有别。 Keyed Window --键控窗
// Keyed Window
stream.keyBy(...) - 按照一个Key进行分组.window(...) - 将数据流中的元素分配到相应的窗口中[.trigger(...)] - 指定触发器Trigger可选[.evictor(...)] - 指定清除器Evictor(可选).reduce/aggregate/process/apply() - 窗口处理函数Window Function Non-Keyed Window
// Non-Keyed Window
stream.windowAll(...) - 不分组将数据流中的所有元素分配到相应的窗口中[.trigger(...)] - 指定触发器Trigger可选[.evictor(...)] - 指定清除器Evictor(可选).reduce/aggregate/process() - 窗口处理函数Window Function 核心操作流程 借助窗口分配器WindowAssigner依时间Event Time 或 Processing Time把数据流元素 “分拣” 进对应窗口待满足触发条件常是窗口结束时间到等情况用窗口处理函数如 reduce、aggregate、process 等常用函数处理窗口内数据此外trigger、evictor 是面向高级自定义需求的触发、销毁附加项默认配置也能应对常见场景。 五、Window 的生命周期 分配阶段 窗口分配器依据设定规则像按时间间隔、计数规则等为流入数据 “找家”安置到合适窗口 “桶” 内确定数据归属构建基础计算单元。 触发计算 当预设触发条件达成如时间窗口到结束点对应窗口函数 “登场”对窗口内数据按既定逻辑聚合处理不同窗口函数reduce、aggregate、process处理细节、能力有差异像 process 更底层、功能更强大自带 open/close 生命周期方法且能获取 RuntimeContext。 上图是窗口的生命周期示意图假如我们设置的是一个10分钟的滚动窗口第一个窗口的起始时间是0:00结束时间是0:10后面以此类推。当数据流中的元素流入后窗口分配器会根据时间Event Time或Processing Time分配给相应的窗口。相应窗口满足了触发条件比如已经到了窗口的结束时间会触发相应的Window Function进行计算。注意本图只是一个大致示意图不同的Window Function的处理方式略有不同。 从数据类型上来看一个DataStream经过keyBy转换成KeyedStream再经过window转换成WindowedStream我们要在之上进行reduce、aggregate或process等Window Function对数据进行必要的聚合操作。 六、Window 的分类 Window可以分成两类 CountWindow按指定数据条数生成窗口与时间脱钩。 滚动计数窗口每隔 N 条数据聚焦统计前 N 条如每来 10 条统计前 10 条信息。 滑动计数窗口每隔 N 条数据统计前 M 条N≠M像每过 20 条统计前 15 条情况。 TimeWindow重点基于时间划定窗口。 滚动时间窗口每隔 N 时间统计前 N 时间范围数据如每隔 5 分钟统计前 5 分钟车辆通过量窗口长度与滑动距离均为 5 分钟。 滑动时间窗口每隔 N 时间统计前 M 时间范围数据M≠N像每隔 30 秒统计前 1 分钟车辆数据窗口长度 1 分钟、滑动距离 30 秒。 会话窗口设会话超时时间如 10 分钟期间无数据来则结算上一窗口数据按毫秒精细界定范围与 Key 值关联紧密Key 值无新输入达设定时长就统计不受全局新数据流入干扰。 滚动窗口- TumblingWindow概念 流是连续的无界的有明确的开始无明确的结束
假设有个红绿灯提出个问题计算一下通过这个路口的汽车数量 对于这个问题肯定是无法回答的为何
因为统计是一种对固定数据进行计算的动作。
因为流的数据是源源不断的无法满足固定数据的要求因为不知道何时结束
那么我们换个问题统计1分钟内通过的汽车数量
那么对于这个问题我们就可以解答了。因为这个问题确定了数据的边界从无界的流数据中取出了一部分有边界的数据子集合进行计算。
描述完整就是每隔1分钟统计这1分钟内通过汽车的数量。窗口长度是1分钟时间间隔是1分钟所以这样的窗口就是滚动窗口。 那么这个行为或者说这个统计的数据边界就称之为窗口。
同时我们的问题是以时间来划分被处理的数据边界的那么按照时间划分边界的就称之为时间窗口
反之如果换个问题统计100辆通过的车里面有多少宝马品牌那么这个边界的划分就是按照数量的这样的称之为计数窗口 同时这样的窗口被称之为滚动窗口按照窗口划分依据分为滚动时间窗口、滚动计数窗口。 滑动窗口– SlidingWindow概念 同样是需求改为
每隔1分钟统计前面2分钟内通过的车辆数
对于这个需求我们可以看出窗口长度是2分钟每隔1分钟统计一次窗口长度和时间间隔不相等并且是大于关系就是滑动窗口 或者每通过100辆车统计前面通过的50辆车的品牌占比
对于这个需求可以看出窗口长度是50辆车但是每隔100辆车统计一次
对于这样的窗口我们称之为滑动窗口。 那么在这里面统计多少数据是窗口长度如统计2分钟内的数据统计50辆车中的数据
隔多久统计一次称之为滑动距离如每隔1分钟每隔100辆车
那么可以看出滑动窗口就是滑动距离不等于窗口长度的一种窗口
比如每隔1分钟 统计先前5分钟的数据窗口长度5分钟滑动距离1分钟不相等
比如每隔100条数据统计先前50条数据窗口长度50条滑动距离100条不相等 那如果相等呢相等就是比如每隔1分钟统计前面1分钟的数据窗口长度1分钟滑动距离1分钟相等。
对于这样的需求可以简化成每隔1分钟统计一次数据这就是前面说的滚动窗口
那么我们可以看出
滚动窗口窗口长度 滑动距离
滑动窗口窗口长度 滑动距离 总结其中可以发现对于滑动窗口
滑动距离 窗口长度会漏掉数据比如每隔5分钟统计前面1分钟的数据滑动距离5分钟窗口长度1分钟漏掉4分钟的数据这样的东西没人用。
滑动距离 窗口长度会重复处理数据比如每隔1分钟统计前面5分钟的数据滑动距离1分钟窗口长度5分钟重复处理4分钟的数据
滑动距离 窗口长度不漏也不会重复也就是滚动窗口
窗口的长度(大小) 窗口的间隔 : 如每隔5s, 计算最近10s的数据 【滑动窗口】 窗口的长度(大小) 窗口的间隔: 如每隔10s,计算最近10s的数据 【滚动窗口】 窗口的长度(大小) 窗口的间隔: 每隔15s,计算最近10s的数据 【没有名字,不用】 会话窗口 [了解] Session 会话一次会话。就是谈话。
设置一个会话超时时间间隔即可, 如10分钟,那么表示:
如果10分钟没有数据到来, 就计算上一个窗口的数据 代码中并行度设置为1测试比较 方便。 窗口的范围 窗口的判断是按照毫秒为单位 如果窗口长度是5秒 窗口的开始: start 窗口的结束: start 窗口长度 -1 毫秒 比如窗口长度是5秒, 从0开始 那么窗口结束是: 0 5000 -1 4999 七、Windows Function 窗口函数 分类剖析 全量函数耐心缓存窗口所有元素直至触发条件成熟才对全量数据 “开刀” 计算此特性可满足数据排序等复杂需求。
增量函数保存中间数据 “蓝本”新元素流入就与之融合更新持续迭代中间成果高效且灵活。 增量聚合函数以 AggregateFunction 为例 每有新数据 “入局”立马按规则计算其接口含输入类型IN、累加器类型ACC、输出类型OUT参数有对应 add、createAccumulator、merge、extractOutput 等方法构建严谨聚合流程。 实现方法常见的增量聚合函数如下 reduce(reduceFunction) aggregate(aggregateFunction) sum() min() max() reduce接受两个相同类型的输入生成一个同类型输出所以泛型就一个 T maxBy、minBy、sum这3个底层都是由reduce实现的 aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同泛型有T, ACC, R AggregateFunction 【了解】 AggregateFunction 比 ReduceFunction 更加的通用它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。 输入类型是输入流中的元素类型AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。 package com.bigdata.windows;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _04_AggDemo {public static final Tuple3[] ENGLISH new Tuple3[] {Tuple3.of(class1, 张三, 100L),Tuple3.of(class1, 李四, 40L),Tuple3.of(class1, 王五, 60L),Tuple3.of(class2, 赵六, 20L),Tuple3.of(class2, 小七, 30L),Tuple3.of(class2, 小八, 50L)};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSourceTuple3String,String,Long dataStreamSource env.fromElements(ENGLISH);KeyedStreamTuple3String,String,Long, String keyedStream dataStreamSource.keyBy(new KeySelectorTuple3String,String,Long, String() {Overridepublic String getKey(Tuple3String,String,Long tuple3) throws Exception {return tuple3.f0;}});//3. transformation-数据处理转换// 三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)keyedStream.countWindow(3).aggregate(new AggregateFunctionTuple3String,String,Long, Tuple3String,Long,Integer, Tuple2String,Double() {// 初始化一个中间变量Tuple3String,Long,Integer tuple3 Tuple3.of(null,0L,0);Overridepublic Tuple3String,Long,Integer createAccumulator() {return tuple3;}Overridepublic Tuple3String,Long,Integer add(Tuple3String, String, Long value, Tuple3String,Long,Integer accumulator) {long tempScore value.f2 accumulator.f1;int length accumulator.f2 1;return Tuple3.of(value.f0, tempScore,length);}Overridepublic Tuple2String, Double getResult( Tuple3String,Long,Integer accumulator) {return Tuple2.of(accumulator.f0,(double) accumulator.f1 / accumulator.f2);}Overridepublic Tuple3String, Long, Integer merge(Tuple3String, Long, Integer a, Tuple3String, Long, Integer b) {return Tuple3.of(a.f0,a.f1b.f1,a.f2b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
} 全量聚合函数 坚守等窗口数据集齐 “发令枪响” 才运算原则确保计算基于完整数据集保障结果准确性、完整性契合多场景聚合诉求。 实现方法 apply(windowFunction process(processWindowFunction) 全量聚合: 窗口需要维护全部原始数据窗口触发进行全量聚合。 ProcessWindowFunction一次性迭代整个窗口里的所有元素比较重要的一个对象是Context可以获取到事件和状态信息这样我们就可以实现更加灵活的控制该算子会浪费很多性能主要原因是不增量计算要缓存整个窗口然后再去处理所以要设计好内存。 package com.bigdata.day04;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;public class Demo03 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Tuple3[] ENGLISH new Tuple3[] {Tuple3.of(class1, 张三, 100L),Tuple3.of(class1, 李四, 40L),Tuple3.of(class1, 王五, 60L),Tuple3.of(class2, 赵六, 20L),Tuple3.of(class2, 小七, 30L),Tuple3.of(class2, 小八, 50L)};// 先求每个班级的总分数再求每个班级的总人数DataStreamSourceTuple3String,String,Long streamSource env.fromElements(ENGLISH);KeyedStreamTuple3String, String, Long, String keyedStream streamSource.keyBy(v - v.f0);// 每个分区中的数据都达到了3条才能触发哪个分区达到了三条哪个就触发不够的不计算// //Tuple3String, String, Long 输入类型// //Tuple2Long, Long 累加器ACC类型保存中间状态 第一个值代表总成绩第二个值代表总人数// //Double 输出类型// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象keyedStream.countWindow(3).apply(new WindowFunctionTuple3String, String, Long, Double, String, GlobalWindow() {Overridepublic void apply(String s, GlobalWindow window, IterableTuple3String, String, Long input, CollectorDouble out) throws Exception {// 计算总成绩计算总人数int sumScore 0,sumPerson0;for (Tuple3String, String, Long tuple3 : input) {sumScore tuple3.f2;sumPerson 1;}out.collect((double)sumScore/sumPerson);}}).print();//5. execute-执行env.execute();}
} 八、案例实战 案例一 需求为 “每 5 秒钟统计一次最近 5 秒钟内各个路口通过红绿灯汽车的数量”借 Flink 代码实现底层算法作用下数据按节奏聚合统计时间设 1 分钟更易观察效果能清晰看到各时段车辆数统计产出。 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 没有添加窗口的写法
package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo07 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceString streamSource env.socketTextStream(localhost, 9999);// 9,2 -- (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunctionString, Tuple2Integer,Integer() {Overridepublic Tuple2Integer, Integer map(String line) throws Exception {String[] arr line.split(,);int monitor_id Integer.valueOf(arr[0]);int num Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple-tuple.f0).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
此处的sum求和中count ,其实是CartInfo中的一个字段而已。
演示 滚动窗口演示
package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo08 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceString streamSource env.socketTextStream(localhost, 9999);// 9,2 -- (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunctionString, Tuple2Integer,Integer() {Overridepublic Tuple2Integer, Integer map(String line) throws Exception {String[] arr line.split(,);int monitor_id Integer.valueOf(arr[0]);int num Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple-tuple.f0).window(TumblingProcessingTimeWindows.of(Time.minutes(1))).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
} 以上代码的时间最好修改为1分钟假如时间间隔是1分钟那么48分03秒时输入的信号灯数据49分整点会统计出来结果原因是底层有一个算法。 滑动窗口的话不太容易看到效果因为有些数据被算到了多个窗口中需要我们拿笔自己计算一下对比一下 滑动窗口演示 同样统计各路口汽车数量但需求改为 “每 5 秒钟统计一次最近 10 秒钟内”因数据会在多窗口重复计算需手动比对梳理深入体会滑动窗口数据处理逻辑与特点。
package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo09 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceString streamSource env.socketTextStream(localhost, 9999);// 9,2 -- (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunctionString, Tuple2Integer,Integer() {Overridepublic Tuple2Integer, Integer map(String line) throws Exception {String[] arr line.split(,);int monitor_id Integer.valueOf(arr[0]);int num Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple-tuple.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
} 热词统计案例 借助 Kafka 随机发送 50000 个热词200 毫秒间隔分别基于滚动、滑动窗口统计编写 Flink 代码时着重体会 apply 方法兼顾二者效果差异同时知晓工作中 process 函数因更强大底层能力常成首选。 apply和process都是处理全量计算但工作中正常用process。
process更加底层更加强大有open/close生命周期方法又可获取RuntimeContext。 package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;public class Demo10 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties new Properties();properties.setProperty(bootstrap.servers,bigdata01:9092);properties.setProperty(group.id,g2);//2. source-加载数据FlinkKafkaConsumerString kafkaConsumer new FlinkKafkaConsumerString(flink-01,new SimpleStringSchema(),properties);DataStreamSourceString kafkaSource env.addSource(kafkaConsumer);//3. transformation-数据处理转换kafkaSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value,1);}}).keyBy(tuple-tuple.f0)//.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, // 代表分组key值 五旬老太守国门TimeWindow window, // 代表窗口对象IterableTuple2String, Integer input, // 分组过之后的数据 [1,1,1,1,1]CollectorString out // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);long start window.getStart();long end window.getEnd();int sum 0;for (Tuple2String, Integer tuple2 : input) {sum tuple2.f1;}out.collect(key,窗口开始dateFormat.format(new Date(start)),结束时间:dateFormat.format(new Date(end)),sum);//out.collect(key,窗口开始start,结束时间:end,sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
} kafka发送消息的模板代码 package com.bigdata.day03.time;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class CustomProducer {public static void main(String[] args) {// Properties 它是map的一种Properties properties new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bigdata01:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 创建了一个消息生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 调用这个里面的send方法String[] hotWords new String[]{郭有才,歌手2024,五旬老太守国门,师夷长技以制夷};Random random new Random();for (int i 0; i 50000; i) {String word hotWords[random.nextInt(4)];ProducerRecordString, String producerRecord new ProducerRecordString, String(flink-01,word);kafkaProducer.send(producerRecord);}kafkaProducer.close();}
}九、总结 Flink 窗口机制犹如精密仪器从控制属性、分类设计到函数运用各环节紧密相扣。深入理解其原理、熟练实操代码能为实时流数据处理注入强大动力解锁更多高效、智能数据聚合分析场景助力开发者在大数据浪潮中稳立潮头、驾驭数据。后续可深入探索自定义窗口逻辑、优化性能调优等进阶方向深挖 Flink 窗口潜力。