一站式做网站系统,重庆搜索引擎优化,重庆市住建局官方网站,浙江建设工程信息网查询前言 今天学习剩下的转换算子#xff1a;分区、分流、合流。 每天出来自学是一件孤独又充实的事情#xff0c;希望多年以后回望自己的大学生活#xff0c;不会因为自己的懒惰与懈怠而悔恨。 回答之所以起到了作用#xff0c;原因是他们自己很努力。 …前言 今天学习剩下的转换算子分区、分流、合流。 每天出来自学是一件孤独又充实的事情希望多年以后回望自己的大学生活不会因为自己的懒惰与懈怠而悔恨。 回答之所以起到了作用原因是他们自己很努力。 -《解忧杂货店》 1、物理分区算子
常见的物理分区策略有随机分配Random、轮询分配Round-Robin、重缩放Rescale和广播Broadcast下边我们分别来做了解。
此外还有我们之前用过的 keyBy 聚合算子它也是一个分区算子。
1.1、随机分区shuffle
package com.lyh.partition;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class PartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString socketDS env.socketTextStream(localhost, 9999);// 随机分区: random.nextInt(numberOfChannels:下游算子并行度)socketDS.shuffle().print();env.execute();}
}这里的下游算子并行度在这个案例中指的是我们的 Sinkprint算子因为我们的并行度是 2 所以 random.nextInnt(2) 的结果只会是 0 或 1也就是说我们的数据会被随机分到这两个编号的任务槽中。
运行结果
14
25
14
12
11
23
15
可以看到随机分区的结果数据是被随机分到各个区的并没有什么规律。
1.2、轮询分区reblancce
轮询分区就是根据并行度把数据对每个下游的算子进行轮流分配。这种处理方式非常适合于当 数据源倾斜 的情况下我们读取的时候利用轮询分区的方式均匀的把数据分给下游的算子。 分区逻辑
// 轮询分区:socketDS.rebalance().print();
运行结果
2 1
1 2
2 3
1 1
2 5
1 2
2 2
1 11.3、重缩放分区rescale
重缩放分区和轮询分区特别相似对于下游的 n 个子任务我们假设有 2 个 source 算子不一定就是 source 而是带有分区方法的算子那么使用轮询分区每个 source 算子次都要 n 个子任务都轮询发送数据。而重缩放分区的逻辑就是每个 source 算子只负责 n/2 个任务
所以当下游任务数据接收方的数量是上游任务数据发送方数量的整数倍时rescale的效率明显会更高。比如当上游任务数量是 2下游任务数量是 6 时上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。由于 rebalance 是所有分区数据的“重新平衡”当 TaskManager 数据量较多时这种跨节点的网络传输必然影响效率而如果我们配置的 task slot 数量合适用 rescale 的方式进行“局部重缩放”就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配从而避免了网络传输带来的损耗。 从底层实现上看rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务发送数据方和所有下游任务接收数据方之间建立通信通道这是一个笛卡尔积的关系而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道节省了很多资源。 // 缩放分区:socketDS.rescale().print();
这里由于 Socket 这种数据源只支持一个 Source 算子读取所以不做演示。
1.4、广播broadcast
广播类似于一种轮询只不过它每次轮询都会把每个数据发送给所有下游任务。
// 广播分区使用两个并行度来模拟
socketDS.broadcast().print();
运行结果
1 1
2 1
2 2
1 2
1 3
2 3
1 4
2 4
2 5
1 5 1.5、全局分区global
全局分区会把所有数据都发往下游的第一个任务当中。
// 全局分区:socketDS.global().print();
并行度为 2 的情况下运行结果
1 1
1 2
1 3
1 4
1 5
1.6、自定义分区custom
我们可以通过使用 partitionCustom(partitionerkeySelector) 方法来自定义分区策略。在调用时方法需要传入两个参数第一个是自定义分区器Partitioner对象第二个是应用分区器的键字段选择器我们一般都是自己实现一个 KeySelector。
1、自定义分区器
// 自定义分区器
public class MyPartitioner implements PartitionerString {// 返回分区号,我们传进来的是一个数字类型的字符串Overridepublic int partition(String key, int numPartitions) {// 这里我们自己实现一个取模 我们的并行度为2 奇数%21 偶数%20return Integer.parseInt(key) % numPartitions;}
}
public class CustomPartitionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString socketDS env.socketTextStream(localhost, 9999);socketDS.partitionCustom(new MyPartitioner(),key-key).print();env.execute();}
}
运行结果
2 1
1 2
2 3
1 4
2 5
1 6
1 8
可以看到奇数都被分到 2 号线程偶数被分到了 1 号。
2、分流
分流就是把我们传进来的数据流根据一定的规则进行筛选后将符合条件的数据放到对应的流里。
2.1、Filter
读取一个整数数据流将数据划分为奇数数据流和偶数数据流。其实我们上面在自定义分区器已经实现了但那是并行度为 2 的情况刚好达到的这么一种效果。
package com.lyh.split;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SplitByFilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString socketDS env.socketTextStream(localhost, 9999);socketDS.filter(new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {if (Integer.parseInt(value)%20){return true; // 为true则留下来}return false;}});// lambda 表达式// 偶数流socketDS.filter(value - Integer.parseInt(value) % 2 0).print(偶数流);// 奇数流socketDS.filter(value - Integer.parseInt(value)%21).print(奇数流);env.execute();}
}运行结果
偶数流:1 2
奇数流:2 1
奇数流:1 3
偶数流:2 4
偶数流:1 6
偶数流:2 8
奇数流:1 7
奇数流:2 9
缺点明显每次 Source 算子传过来的数据需要把所有数据发送给每个转换算子Filter明显性能要差一些。
2.2、侧输出流
侧输出流后面我们再做详细介绍这里只做简单使用。简单来说只需要调用上下文 context 的 .output() 方法就可以输出任意类型的数据了而侧输出流的标记和提取都离不开一个“输出标签” OutputTag指定了侧输出流的 id 和 类型。
案例-我们根据上一节的 POJO 类 WaterSensor 的 id 进行分流将s1和s2分别分到不同的数据流中去把非s1、s2的数据保留在主流当中
package com.lyh.split;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** author 刘xx* version 1.0* date 2023-11-16 19:25* 使用侧输出流实现数据分流*/
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor sensorDS env.fromElements(new WaterSensor(s1, 1L, 1),new WaterSensor(s2, 2L, 2),new WaterSensor(s3, 3L, 3),new WaterSensor(s2, 2L, 2));//这里的泛型是我们测流中的数据类型, 注意如果不是基本数据类型需要单独设置数据类型OutputTagWaterSensor s1 new OutputTagWaterSensor(s1, Types.POJO(WaterSensor.class));OutputTagWaterSensor s2 new OutputTagWaterSensor(s2, Types.POJO(WaterSensor.class));/*** Flink一共有4层API底层API、DataStream、Table API、Flink SQL* process(processFunction: 处理逻辑,outputType: 主流的输出类型) 是Flink的底层API*/SingleOutputStreamOperatorWaterSensor process sensorDS.process(new ProcessFunctionWaterSensor, WaterSensor() {Overridepublic void processElement(WaterSensor sensor, Context context, CollectorWaterSensor out) throws Exception {if (sensor.getId().equals(s1)) { // 放到侧流s1中context.output(s1, sensor);} else if (sensor.getId().equals(s2)) { // 放到测流s2中context.output(s2, sensor);} else { // 放到主流out.collect(sensor);}}});// 这里打印的是主流的数据,测流需要调用getSideOutput()方法process.print(主流);// 打印测流 s1process.getSideOutput(s1).print(测流s1);// 打印测流 s2process.getSideOutput(s2).print(测流s2);env.execute();}
}运行结果
测流s1 WaterSensor{ids1, ts1, vc1}
测流s2 WaterSensor{ids2, ts2, vc2}
主流 WaterSensor{ids3, ts3, vc3}
测流s2 WaterSensor{ids2, ts2, vc2}这种方式相较于 Filter 明显要效率更高因为它对每个数据只处理一次。
3、合流
在实际应用中我们经常会遇到来源不同的多种数据流需要将它们进行联合处理。这就需要先进行合流Flink 为我们提供了相应的 API。
3.1、联合Union
联合是最简单的合流操作就是直接将多条数据流合在一起。但是它要求每个流中的数据类型必须是相同的合并之后的新流会包括所有流中的元素数据类型不变。
public class UnionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger source1 env.fromElements(1, 2, 3, 4, 5);DataStreamSourceInteger source2 env.fromElements(11, 22, 33, 44, 55);DataStreamSourceString source3 env.fromElements(1, 2, 3, 4, 5);DataStreamInteger union source1.union(source2,source3.map(Integer::valueOf));// 使用parseInt也可以,因为它默认是10进制union.print();env.execute();}
}
总结
使用 union 时每条流的数据类型必须一致可以合并多条流
3.2、连接Connect
流的联合虽然简单不过受限于数据类型不能改变灵活性大打折扣所以实际应用较少出现。除了联合unionFlink 还提供了另外一种方便的合流操作——连接connect。顾名思义这种操作就是直接把两条流像接线一样对接起来。
为了处理更加灵活连接操作允许流的数据类型不同。但我们知道一个 DataStream 中的数据只能有唯一的类型所以连接得到的并不是 DataStream而是一个“连接流”ConnectedStreams。连接流可以看成是两条流形式上的“统一”被放在了一个同一个流中事实上内部仍保持各自的数据形式不变彼此之间是相互独立的。要想得到新的 DataStream还需要进一步定义一个“同处理”co-process转换操作用来说明对于不同来源、不同类型的数据怎样分别进行处理转换、得到统一的输出类型。所以整体上来两条流的连接就像是“一国两制”两条流可以保持各自的数据类型、处理方式也可以不同不过最终还是会统一到同一个 DataStream 中。 1CoMapFunction
package com.lyh.combine;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;/*** author 刘xx* version 1.0* date 2023-11-16 20:04*/
public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger source1 env.fromElements(1, 2, 3, 4, 5);DataStreamSourceString source2 env.fromElements(a, b, c, d, e);// connect 合并后,两个数据流仍然是独立的ConnectedStreamsInteger, String connectedStreams source1.connect(source2);// map 将两个不同类型的数据转为统一的数据类型SingleOutputStreamOperatorString res connectedStreams.map(new CoMapFunctionInteger, String, String() {Overridepublic String map1(Integer value) throws Exception {return String.valueOf(value);}Overridepublic String map2(String value) throws Exception {return value;}});res.print();env.execute();}
}运行结果
1
a
2
b
3
c
4
d
5
e
总结
一次只能连接 2 条流流的数据类型可以不一样连接后可以调用 map实现 CoMapFunction 接口、flatMap实现 CoFlatMapFunction接口、process实现 CoProcessFunction 接口 来处理但是各处理各的
2CoFlatMapFunction flatMap 和 map 一样同样对两种数据流实现两种不同的处理方法flatMap1 和 flatMap2。
3CoProcessFunction 调用 .process()时传入的则是一个 CoProcessFunction 实现类。抽象类CoProcessFunction 在源码中定义如下
// IN1: 第一条流的类型 IN2: 第二条流的类型 OUT: 输出类型
public abstract class CoProcessFunctionIN1, IN2, OUT extends AbstractRichFunction {...public abstract void processElement1(IN1 value, Context ctx, CollectorOUT out) throws Exception;public abstract void processElement2(IN2 value, Context ctx, CollectorOUT out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorOUT out) throws Exception {}public abstract class Context {...}...
}它需要实现的也是两个方法processElement1、processElement2当数据到来的时候它会根据其来源调用其中的一个方法进行处理。CoProcessFunction 同样可以通过上下文 ctx 来访问 timestamp、水位线并通过 TimerService 注册定时器另外也提供了.onTimer()方法用于定义定时触发的处理操作。
案例-我们创建两个数据流一个二元组一个三元组要求根据两个不同类型元组的第一个字段匹配以字符串的形式输出该元组。
package com.lyh.combine;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** author 刘xx* version 1.0* date 2023-11-17 10:02*/
public class ConnectKeyByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceTuple2Integer, String source1 env.fromElements(Tuple2.of(1, a1),Tuple2.of(1, a2),Tuple2.of(3, b),Tuple2.of(4, c));DataStreamSourceTuple3Integer, String,Integer source2 env.fromElements(Tuple3.of(1, a1,1),Tuple3.of(1, a2,2),Tuple3.of(3, b,1),Tuple3.of(4, c,1));// 连接两条流 输出能根据 id 匹配上的数据类似 inner joinConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connect source1.connect(source2);/*** 每条流实现相互匹配:* 1、每条流的数据来了之后因为是各处理各的所以要关联在一起必须存到一个变量中去* HashMapkey:String,value:ListTuple* 2、除了存变量外还需要去另一条流存的变量中去查找是否有匹配的*/SingleOutputStreamOperatorString process connect.process(new CoProcessFunctionTuple2Integer, String, Tuple3Integer, String, Integer, String() {MapInteger, ListTuple2Integer, String s1Cache new HashMap();MapInteger, ListTuple3Integer, String, Integer s2Cache new HashMap();Overridepublic void processElement1(Tuple2Integer, String value, Context ctx, CollectorString out) throws Exception {Integer id value.f0;// source1 的数据来了就存到变量中去if (!s1Cache.containsKey(id)) {ListTuple2Integer, String list new ArrayList();list.add(value);s1Cache.put(id, list);} else {s1Cache.get(id).add(value);}// 去 s2Cache 中去查找是否有匹配的if (s2Cache.containsKey(id)) {for (Tuple3Integer, String, Integer s2Element : s2Cache.get(id)) {out.collect(s1: value -------- s2: s2Element);}}}Overridepublic void processElement2(Tuple3Integer, String, Integer value, Context ctx, CollectorString out) throws Exception {Integer id value.f0;// source2 的数据来了就存到变量中去if (!s2Cache.containsKey(id)) {ListTuple3Integer, String, Integer list new ArrayList();list.add(value);s2Cache.put(id, list);} else {s2Cache.get(id).add(value);}// 去 s1Cache 中去查找是否有匹配的if (s1Cache.containsKey(id)) {for (Tuple2Integer, String s1Element : s1Cache.get(id)) {out.collect(s2: value -------- s1: s1Element);}}}});process.print();env.execute();}
}运行结果
s2:(1,a1,1)--------s1:(1,a1)
s1:(1,a2)--------s2:(1,a1,1)
s2:(1,a2,2)--------s1:(1,a1)
s2:(1,a2,2)--------s1:(1,a2)
s2:(3,b,1)--------s1:(3,b)
s2:(4,c,1)--------s1:(4,c)
我们设置并行度为 2 再运行
env.setParallelism(2);
运行结果
第一次
2 s1:(1,a2)--------s2:(1,a1,1)
1 s1:(1,a1)--------s2:(1,a2,2)
第二次
2 s2:(1,a2,2)--------s1:(1,a2)
1 s2:(1,a1,1)--------s1:(1,a1)
2 s2:(4,c,1)--------s1:(4,c)
1 s2:(3,b,1)--------s1:(3,b) 我们发现当并行度为多个的时候如果不指定分区器的话每次的运行结果都不一样。 在CoProcessFunction中可以通过RuntimeContext对象来获取自己的任务编号。所以我们通过在 processElement1 和 processElement2 方法中 调用getRuntimeContext().getIndexOfThisSubtask() 方法获得当前数据所在的 任务编号可以发现几乎每次数据的分区结果都不一样但元组对象的 hash值却是一样的。具体分区细节还得去看源码。 指定按照 元组的第一个字段进行 keyBy 分区
// 多并行度条件下需要根据关联条件进行 keyBy 才能保证相同的 key 分到同一任务中去ConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connect source1.connect(source2).keyBy(s1 - s1.f0,s2-s2.f0);
运行结果
1 s1:(4,c)--------s2:(4,c,1)
2 s1:(1,a1)--------s2:(1,a1,1)
2 s2:(1,a2,2)--------s1:(1,a1)
2 s1:(1,a2)--------s2:(1,a1,1)
2 s1:(1,a2)--------s2:(1,a2,2)
2 s1:(3,b)--------s2:(3,b,1)