当前位置: 首页 > news >正文

网站建设方案博客宁波58同城网

网站建设方案博客,宁波58同城网,龙岩做网站开发要多久,查询网站哪做的DataStream API 是 Flink 的核心层 API。一个 Flink 程序#xff0c;其实就是对DataStream的各种转换。 代码基本上都由以下几部分构成#xff1a; 执行环境#xff08;Execution Environment#xff09; 1#xff09;创建执行环境StreamExecutionEnvironment StreamExe… DataStream API 是 Flink 的核心层 API。一个 Flink 程序其实就是对DataStream的各种转换。 代码基本上都由以下几部分构成 执行环境Execution Environment 1创建执行环境StreamExecutionEnvironment StreamExecutionEnvironment 类的对象这是所有Flink程序的基础。调用这个类的静态方法具体有以下三种。 1getExecutionEnvironment √ 最简单的方式就是直接调用 getExecutionEnvironment 方法。这个方法会根据当前运行的方式自行决定该返回什么样的运行环境。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment()2createLocalEnvironment 返回一个本地执行环境。可以在调用时传入一个参数指定默认的并行度如果不传入则默认并行度就是本地的 CPU 核心数。 StreamExecutionEnvironment localEnv StreamExecutionEnvironment.createLocalEnvironment();3createRemoteEnvironment 返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号并指定要在集群中运行的 Jar 包。 StreamExecutionEnvironment remoteEnv StreamExecutionEnvironment.createRemoteEnvironment(host, // JobManager 主机名1234, // JobManager 进程端口号path/to/jarFile.jar // 提交给 JobManager 的JAR包 );在获取到程序执行环境后还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链还可以定义程序的时间语义、配置容错机制。 2执行模式Execution Mode 从 Flink 1.12 开始官方推荐的做法是直接使用 DataStream API在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API。 // 流处理环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStream API 执行模式包括流执行模式、批执行模式和自动模式。 流执行模式Streaming DataStream API 最经典的模式一般用于需要持续实时处理的无界数据流。默认情况下程序使用的就是 Streaming 执行模式。 批执行模式Batch 专门用于批处理的执行模式。 自动模式AutoMatic 在这种模式下将由程序根据输入数据源是否有界来自动选择执行模式。 1通过命令行配置 bin/flink run -Dexecution.runtime-modeBATCH ...在提交作业时增加 execution.runtime-mode 参数指定值为BATCH。 2通过代码配置 实际应用中一般不会在代码中配置而是使用命令行这样更加灵活。 3触发程序执行 Flink 是由事件驱动的只有等到数据到来才会触发真正的计算这也被称为“延迟执行”或“懒执行”。 同步执行 or 异步执行了解 execute() or executeAsync() env.execute(); // 同步 env.executeAsync(); // 异步exexute总结 默认 env.execute() 触发一个Flink job 一个main方法可以调用多个execute但是没有意义指定一个就会阻塞住同步 env.executeAsync()异步不阻塞 一个main方法里 executeAsync()个数 生成Flink job数 源算子Source Flink 可以从各种来源获取数据然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。所以source 是整个处理程序的输入端。 在 Flink1.12 以前旧的添加 source 的方式是调用执行环境的addSource()方法 DataStreamString stream env.addSource(...);方法传入的参数是一个“源函数”source function需要实现SourceFunction 接口。 从 Flink1.12 开始主要使用流批统一的新 Source 架构fromSource()方法 DataStreamSourceString stream env.fromSource(…);Flink 直接提供了很多预实现的接口此外还有很多外部连接工具也实现了对应的Source通常情况下足以应对实际需求。 1从集合中读取数据 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.fromElements(1, 2, 3, 4, 5) // 直接填写元素 // .fromCollection(Arrays.asList(1, 2, 3, 4, 5)) // 从集合读取元素.print();env.execute();2从文件读取数据 通常情况会从存储介质中获取数据一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。 读取文件需要添加文件连接器依赖: dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependencyStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(),// 文件路径相对路径不行就用绝对路径Path.fromLocalFile(new File(D:\\workspace\\IdeaProjects\\second-java\\day5-flink\\src\\main\\resources\\input\\words.txt))).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), fileSource).print();env.execute();说明 参数可以是目录也可以是文件还可以从 HDFS 目录下读取使用路径hdfs://…路径可以是相对路径也可以是绝对路径相对路径是从系统属性 user.dir 获取路径idea 下是project 的根目录standalone模式下是集群节点根目录。 3从 Socket 读取数据 读取 socket 文本流就是流处理场景。但是这种方式由于吞吐量小、稳定性较差一般也是用于测试。 DataStreamSourceString lineStream env.socketTextStream(localhost, 7777);4从 Kafka 读取数据 Flink 官方提供了连接工具 flink-connector-kafka 直接实现了一个消费者FlinkKafkaConsumer它就是用来读取 Kafka 数据的 SourceFunction。 引入Kafka 连接器的依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency代码如下 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(124.222.253.33:9092) // ip:port.setTopics(topic_1) // topic.setGroupId(group1) // 消费者组// latest 将偏移初始化为最新偏移的OffsetInitializer// earliest 偏移初始化为最早可用偏移的OffsetInitializer.setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build(); // 仅Value反序列化env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka-source).print();env.execute();5从数据生成器读取数据 Flink 从 1.11 开始提供了一个内置的 DataGen 连接器主要是用于生成一些随机数用于在没有数据源的时候进行流任务的测试以及性能测试等。 需要导入依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version/dependency代码如下 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(// 数据转换生成函数new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},// 从0开始递增至这个数Long.MAX_VALUE,// 数据生产效率每秒10个RateLimiterStrategy.perSecond(10), Types.STRING);env.fromSource(dataGeneratorSource,WatermarkStrategy.noWatermarks(), dataGenerator).print();env.execute();Flink 支持的数据类型 1Flink 的类型系统 Flink 使用“类型信息”TypeInformation来统一表示数据类型。TypeInformation类是Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性并为每个数据类型生成特定的序列化器、反序列化器和比较器。 2Flink 支持的数据类型 对于常见的 Java 和 Scala 数据类型Flink 都是支持的。Flink 在内部Flink 对支持不同的类型进行了划分这些类型可以在 Types 工具类中找到 1基本类型 所有 Java 基本类型及其包装类再加上 Void、String、Date、BigDecimal 和BigInteger。 2数组类型 包括基本类型数组PRIMITIVE_ARRAY和对象数组OBJECT_ARRAY。 3复合数据类型 Java 元组类型TUPLE这是 Flink 内置的元组类型是Java API 的一部分。最多25 个字段也就是从 Tuple0~Tuple25不支持空字段。行类型ROW可以认为是具有任意个字段的元组并支持空字段。POJOFlink 自定义的类似于 Java bean 模式的类。 Flink 对 POJO 类型的要求如下 类是公有public的有一个无参的构造方法所有属性都是公有public的所有属性的类型都是可以序列化的 4辅助类型 Option、Either、List、Map 等。 5泛型类型GENERIC Flink 会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由 Flink 本身序列化的而是由 Kryo 序列化的。 3类型提示Type Hints 有时需要显式地告诉系统当前的返回类型才能正确地解析出完整数据。 .map(word - Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG));或者使用TypeHint 类它可以捕获泛型的类型信息并且一直记录下来为运行时提供足够的信息。仍通过.returns()方法明确地指定转换之后的DataStream 里元素的类型。 returns(new TypeHintTuple2Integer, SomeType(){})转换算子Transformation 开始之前准备 WaterSensor 作为数据模型。 字段分别代表如下含义 字段名数据类型说明idString水位传感器类型tsLong传感器记录时间戳vcInteger水位记录 public class WaterSensor {public String id;public Long ts;public Integer vc;public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}public String getId() {return id;}public void setId(String id) {this.id id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc vc;}Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;WaterSensor that (WaterSensor) o;return Objects.equals(id, that.id) Objects.equals(ts, that.ts) Objects.equals(vc, that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };} }数据源读入数据之后就可以使用各种转换算子将一个或多个DataStream转换为新的 DataStream。 转换算子一般有三种写法 匿名类lambda表达式实现函数接口 1基本转换算子map/ filter/ flatMap 1映射map “一一映射”消费一个元素就产出一个元素。 需求提取 WaterSensor 中的id 字段 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1L, 1),new WaterSensor(sensor_2, 2L, 2));stream.map(WaterSensor::getId).print();env.execute();}2过滤filter 需求将数据流中传感器 id 为 sensor_1 的数据过滤出来 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1L, 1),new WaterSensor(sensor_2, 2L, 2));stream.filter(el - sensor_1.equals(el.getId())).print();env.execute();3扁平映射flatMap flatMap 一进多出 需求如果输入的数据是 sensor_1只打印 vc如果输入的数据是sensor_2既打印 ts 又打印 vc。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1L, 1),new WaterSensor(sensor_2, 2L, 2));stream.flatMap(new FlatMapFunctionWaterSensor, String() {Overridepublic void flatMap(WaterSensor value, CollectorString out) throws Exception {if (sensor_1.equals(value.getId())) {out.collect(value.getVc().toString());} else if (sensor_2.equals(value.getId())) {out.collect(value.getTs().toString());out.collect(value.getVc().toString());}}}).print();env.execute();输出结果 map如何控制一进一出 ​ 使用return flatmap怎么控制一进多出 ​ 通过Collector输出调用几次就输出几条向下游输送数据 2 聚合算子Aggregation 计算的结果不仅依赖当前数据还跟之前的数据有关相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”Aggregation类似于MapReduce 中的reduce操作。 1 按键分区keyBy 在 Flink 中要做聚合需要先进行分区这个操作就是通过 keyBy 来完成的。 keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键key可以将一条流从逻辑上划分成不同的分区partitions。这里所说的分区其实就是并行处理的子任务。 通过计算 key 的哈希值hash code对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话必须要重写 hashCode()方法。 id 作为 key 做一个分区操作代码实现如下 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1L, 1),new WaterSensor(sensor_1, 2L, 2),new WaterSensor(sensor_2, 2L, 2),new WaterSensor(sensor_3, 3L, 3));KeyedStreamWaterSensor, String keyedStream stream.keyBy(WaterSensor::getId);keyedStream.print();env.execute();运行结果如下 keyBy 返回的是 KeyedStream 键控流不是转换算子只是对数据进行重分区不能设置并行度 keyBy分组 和 分区 的关系 keyBy对数据分组相同key的数据被分为一组同时保证 相同的key的数据 在同一个分区分区一个子任务可以理解为一个分区一个分区子任务中可以存在多个分组key KeyedStream 是一个非常重要的数据结构只有基于它才可以做后续的聚合操作比如 sumreduce。 2简单聚合sum/min/max/minBy/maxBy 有了按键分区的数据流 KeyedStream就可以基于它进行聚合操作了。 Flink内置实现了一些最基本、最简单的聚合 API主要有以下几种 sum()在输入流上对指定的字段做叠加求和的操作。 min()在输入流上对指定的字段求最小值。 max()在输入流上对指定的字段求最大值。 minBy()与 min()类似在输入流上针对指定字段求最小值。不同的是min()只计算指定字段的最小值其他字段会保留最初第一个数据的值而minBy()则会返回包含字段最小值的整条数据。 maxBy()同上 聚合方法调用时也需要传入参数聚合指定的字段。指定字段的方式有两种指定位置和指定名称。【指定位置索引适用于 Tuple类型POJP不行】 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1L, 1),new WaterSensor(sensor_1, 2L, 2),new WaterSensor(sensor_2, 2L, 2),new WaterSensor(sensor_3, 3L, 3));KeyedStreamWaterSensor, String keyedStream stream.keyBy(WaterSensor::getId);keyedStream.maxBy(vc) // 指定字段名称.print();env.execute();keyBy和聚合是成对出现的先分区、后聚合得到的依然是一个 DataStream。 一个聚合算子会为每一个 key 保存一个聚合的值在Flink 中把它叫作“状态”state。 每当有一个新的数据输入算子就会更新保存的聚合结果并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说这些状态是永远不会被清除的。 使用聚合算子应该只用在含有有限个 key 的数据流上。 3归约聚合reduce 案例使用 reduce 实现 max 和 maxBy 的功能。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1L, 1),new WaterSensor(sensor_1, 2L, 2),new WaterSensor(sensor_2, 2L, 2),new WaterSensor(sensor_3, 3L, 3));KeyedStreamWaterSensor, String keyedStream stream.keyBy(WaterSensor::getId);keyedStream.reduce(new ReduceFunctionWaterSensor() {// 同组元素规约处理Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println(value1: value1);System.out.println(value2: value2);int maxVc Math.max(value1.getVc(), value2.getVc());if (value1.getVc() value2.getVc()) {value1.setVc(maxVc);return value1;} else {value2.setVc(maxVc);return value2;}}}).print();env.execute();reduce 输入类型 输出类型类型不变 每个key的第一条数据来的时候不会执行reduce方法存起来直接输出 reduce方法中的两个参数 ​ value1之前计算结果有状态 value2现在来的数据 reduce 算子也应该作用在一个有限 key 的流上。 用户自定义函数UDF 用户自定义函数user-defined functionUDF即用户可以根据自身需求重新实现算子的逻辑。 用户自定义函数分为函数类、匿名函数、富函数类。 1函数类 函数类可以传参数更加灵活 public class FilterIdFunction implements FilterFunctionWaterSensor {private final String id;public FilterIdFunction(String id) {this.id id;}Overridepublic boolean filter(WaterSensor value) throws Exception {return id.equals(value.getId());} }2富函数类Rich Function Classes “富函数类”也是 DataStream API 提供的一个函数类的接口所有的Flink 函数类都有其 Rich版本 。 富函数类一般是以抽象类的形式出现的。例如RichMapFunction、RichFilterFunction、RichReduceFunction 等。 富函数类可以获取运行环境的上下文并拥有一些生命周期方法所以可以实现更复杂的功能。 open()方法是 Rich Function 的初始化方法每个子任务启动时调用一次。close()方法是生命周期中的最后一个调用的方法类似于结束方法。一般用来做一些清理工作。 如果是Flink程序异常挂掉不会调用close正常调用cancel命令可以close 代码实现 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1, 2, 3, 4).map(new RichMapFunctionInteger, Integer() {Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println( 索引是 getRuntimeContext().getIndexOfThisSubtask() 的任务的生命周期开始);}Overridepublic Integer map(Integer integer)throws Exception {return integer 1;}Overridepublic void close() throws Exception {super.close();System.out.println( 索引是 getRuntimeContext().getIndexOfThisSubtask() 的任务的生命周期结束);}}).print();env.execute();物理分区算子Physical Partitioning 常见的物理分区策略有随机分配Random、轮询分配Round-Robin、重缩放Rescale和广播Broadcast。 1随机分区shuffle 将数据随机地分配到下游算子的并行任务中去。可以打乱数据。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString stream env.socketTextStream(124.222.253.33, 7777);stream.shuffle().print();env.execute();2轮询分区Round-Robin 雨露均沾下游算子一个一个来所有算子 stream.rebalance()3重缩放分区rescale 重缩放分区和轮询分区非常相似。当调用 rescale()方法时其实底层也是使用Round-Robin 算法进行轮询但是只会将数据轮询发送到下游并行任务的一部分中。部分算子 stream.rescale()4广播broadcast 将输入数据复制并发送到下游算子的所有并行任务中去。 stream.broadcast()5全局分区global 一种特殊的分区方式。这种做法非常极端通过调用.global()方法会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1。 stream.global()6自定义分区Custom Flink 内置所有分区策略都不能满足用户的需求时可以通过使用partitionCustom()方法来自定义分区策略。 1自定义分区器 public class MyPartitioner implements PartitionerString {// numPartitions: 子任务数量分区数量Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;} }2使用自定义分区 public class PartitionCustomDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString socketDS env.socketTextStream(124.222.253.33, 7777);DataStreamString myDS socketDS.partitionCustom(new MyPartitioner(),value - value);myDS.print();env.execute();} }八种分区器 Flink提供了8种分区器7种内置1种自定义 分流 将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream定义一些筛选条件将符合条件的数据拣选出来放到对应的流里。 1简单实现 使用filter筛选多次将原始数据流stream复制多份然后对每一份分别做筛选不够高效。 2侧输出流process process算子 调用上下文 ctx 的.output()方法就可以输出任意类型的数据了。而侧输出流的标记和提取都离不开一个**“输出标签”OutputTag**指定了侧输出流的 id 和类型。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperatorWaterSensor ds env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction());// 定义输出标签OutputTagWaterSensor s1 new OutputTagWaterSensor(s1, Types.POJO(WaterSensor.class)) {};OutputTagWaterSensor s2 new OutputTagWaterSensor(s2, Types.POJO(WaterSensor.class)) {};// 返回的都是主流SingleOutputStreamOperatorWaterSensor ds1 ds.process(new ProcessFunctionWaterSensor, WaterSensor() {Overridepublic void processElement(WaterSensor value, ProcessFunctionWaterSensor, WaterSensor.Context ctx, CollectorWaterSensor out) throws Exception {if (s1.equals(value.getId())) {ctx.output(s1, value);} else if (s2.equals(value.getId())) {ctx.output(s2, value);} else {// 主流out.collect(value);}}});// 打印主流ds1.print(主流数据);// 通过主流获取侧边流SideOutputDataStreamWaterSensor sideOutput1 ds1.getSideOutput(s1);SideOutputDataStreamWaterSensor sideOutput2 ds1.getSideOutput(s2);sideOutput1.printToErr(测流s1);sideOutput2.printToErr(测流s2);env.execute();合流 1联合Union 联合操作要求必须流中的数据类型必须相同合并之后的新流会包括所有流中的元素数据类型不变。可以将多条流联合在一起 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger ds1 env.fromElements(1, 2, 3);DataStreamSourceInteger ds2 env.fromElements(2, 2, 3);DataStreamSourceString ds3 env.fromElements(2, 2, 3);// ds3 类型不一致不能联合ds1.union(ds2).print();env.execute();2连接Connect 1简单使用 连接一次只能连接2条流流的数据类型可以不一样 代码实现 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger source1 env.fromElements(1, 2, 3);DataStreamSourceString source2 env.fromElements(a, b, c);ConnectedStreamsInteger, String connect source1.connect(source2);/*connect:1、一次只能连接 2 条流2、流的数据类型可以不一样3、连接后可以调用 map、flatmap、process 来处理但是各处理各的*/connect.map(new CoMapFunctionInteger, String, String() {Overridepublic String map1(Integer value) throws Exception {return 来源于数字流: value.toString();}Overridepublic String map2(String value) throws Exception {return 来源于字母流: value;}}).print();env.execute();2CoProcessFunction 需求连接两条流输出能根据 id 匹配上的数据类似inner join 效果 注意connectedStreams.keyBy(keySelector1, keySelector2); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(2);DataStreamSourceTuple2Integer, String source1 env.fromElements(Tuple2.of(1, a1),Tuple2.of(1, a2),Tuple2.of(2, b),Tuple2.of(3, c));DataStreamSourceTuple3Integer, String, Integer source2 env.fromElements(Tuple3.of(1, aa1, 1),Tuple3.of(1, aa2, 2),Tuple3.of(2, bb, 1),Tuple3.of(3, cc, 1));// 定义HashMap 缓存来过的数据keyidvaluelist数据MapInteger, ListTuple2Integer, String s1Cache new HashMap();MapInteger, ListTuple3Integer, String, Integer s2Cache new HashMap();ConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connect source1.connect(source2);// 将合流元素按key分到同一分区才能得到如下结果 ***ConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connectKey connect.keyBy(v - v.f0, v1 - v1.f0);connectKey.process(new CoProcessFunctionTuple2Integer, String, Tuple3Integer, String, Integer, String() {Overridepublic void processElement1(Tuple2Integer, String value, CoProcessFunctionTuple2Integer, String, Tuple3Integer, String, Integer, String.Context ctx, CollectorString out) throws Exception {Integer id value.f0;if (!s1Cache.containsKey(id)) {ListTuple2Integer, String s1Values new ArrayList();s1Values.add(value);s1Cache.put(id, s1Values);} else {s1Cache.get(id).add(value);}if (s2Cache.containsKey(id)) {for (Tuple3Integer, String, Integer s2Element : s2Cache.get(id)) {out.collect(s1: value ---------s2: s2Element);}}}Overridepublic void processElement2(Tuple3Integer, String, Integer value, CoProcessFunctionTuple2Integer, String, Tuple3Integer, String, Integer, String.Context ctx, CollectorString out) throws Exception {Integer id value.f0;if (!s2Cache.containsKey(id)) {ListTuple3Integer, String, Integer s2Values new ArrayList();s2Values.add(value);s2Cache.put(id, s2Values);} else {s2Cache.get(id).add(value);}if (s1Cache.containsKey(id)) {for (Tuple2Integer, String s1Element : s1Cache.get(id)) {out.collect(s1: s1Element ---------s2: value);}}}}).print();env.execute();输出算子Sink 1连接到外部系统 Flink1.12 以前Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。 Flink1.12 开始同样重构了 Sink 架构使用 .sinkTo()方法实现。 2输出到文件 FileSink 支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder可以直接调用 FileSink 的静态方法 行编码 FileSink.forRowFormatbasePathrowEncoder。批量编码 FileSink.forBulkFormatbasePathbulkWriterFactory。 代码实现 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中都有并行度个数的文件在写入env.setParallelism(2);// 【必须开启】 checkpoint否则一直都是 .inprogressenv.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);// 数据生成器DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource,WatermarkStrategy.noWatermarks(), data-generator);// 输出到文件系统FileSinkString fieSink FileSink// 输出行式存储的文件指定路径、指定编码.StringforRowFormat(new Path(d:/tmp), new SimpleStringEncoder(UTF-8))// 输出文件的一些配置 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(li-).withPartSuffix(.log).build())// 按照目录分桶如下就是每个小时一个目录.withBucketAssigner(newDateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 文件滚动策略: 1 分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(newMemorySize(1024 * 1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();3输出到 Kafka 添加 Kafka 连接器依赖。 代码实现 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是【精准一次必须开启】 checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(124.222.253.33, 7777);/*Kafka Sink:注意如果要使用 【精准一次】 写入 Kafka需要满足以下条件缺一不可1、开启 checkpoint2、设置事务前缀3、设置事务超时时间 checkpoint 间隔 事务超时时间 max的 15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(124.222.253.33:9092)// 指定序列化器指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次必须设置 事务的前缀.setTransactionalIdPrefix(li-)// 如果是精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15 分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();sensorDS.sinkTo(kafkaSink);env.execute();自定义序列化器实现带 key 的 record StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperatorString sensorDS env.socketTextStream(124.222.253.33, 7777);/*如果要指定写入 kafka 的 key可以自定义序列化器1、实现 一个接口重写 序列化 方法2、指定 key转成 字节数组3、指定 value转成 字节数组4、返回一个 ProducerRecord 对象把 key、value 放进去*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(124.222.253.33:9092).setRecordSerializer(new KafkaRecordSerializationSchemaString() {Overridepublic ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(ws, key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(li-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();sensorDS.sinkTo(kafkaSink);env.execute();4输出到 MySQLJDBC 1添加 MySQL 驱动 dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.27/version/dependency2jdbc连接器 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version/dependency官网案例 public class JdbcSinkExample {static class Book {public Book(Long id, String title, String authors, Integer year) {this.id id;this.title title;this.authors authors;this.year year;}final Long id;final String title;final String authors;final Integer year;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();/*写入 mysql1、只能用老的 sink 写法 addSink2、JDBCSink 的 4 个参数:第一个参数 执行的 sql一般就是 insert into第二个参数 预编译 sql 对占位符填充值第三个参数 执行选项 ---》 攒批、重试第四个参数 连接选项 ---》 url、用户名、密码*/env.fromElements(new Book(101L, Stream Processing with Apache Flink, Fabian Hueske, Vasiliki Kalavri, 2019),new Book(102L, Streaming Systems, Tyler Akidau, Slava Chernyak, Reuven Lax, 2018),new Book(103L, Designing Data-Intensive Applications, Martin Kleppmann, 2017),new Book(104L, Kafka: The Definitive Guide, Gwen Shapira, Neha Narkhede, Todd Palino, 2017)).addSink(JdbcSink.sink(insert into books (id, title, authors, year) values (?, ?, ?, ?),(statement, book) - {statement.setLong(1, book.id);statement.setString(2, book.title);statement.setString(3, book.authors);statement.setInt(4, book.year);},JdbcExecutionOptions.builder().withBatchSize(1000) // 批次的大小条数.withBatchIntervalMs(200) // 批次的时间.withMaxRetries(5) // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/test?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF8).withUsername(root).withPassword(root).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build()));env.execute();} }5自定义Sink输出 实现RichSinkDunction 抽象类自定义逻辑比较麻烦不建议。
http://www.hkea.cn/news/14294781/

相关文章:

  • 郑州网站制作哪家招聘如何查看网站是什么语言做的
  • 做图软件下载官方网站美观网站建设价格
  • wordpress主题网杭州抖音seo
  • 描述网站的含义过界女主个人做网站的
  • 如何自建网站入口淘宝推广联盟
  • 做网站ps图片都是多大北京网站手机站建设公司吗
  • 苏州网站建设搭建wordpress vps 伪静态
  • 网站建设kuhugz网站建设作业素材
  • 企业网站开发技术2023年做网站怎么样
  • 广丰网站建设北京未来广告公司
  • 建设网站需要it外包收费
  • 公司网站建设指南网页页面设计叫什么
  • 网站建设 后台空间容量网络营销案例分析范文
  • 做影视网站算侵权吗做课件最好的素材网站
  • 百度推广登录入口官网网址江苏网站建设seo优化
  • 做网站周记wordpress不同内容
  • 资源类网站怎么做企业网站设计建设长春
  • 快速免费做网站上海网站制作电话
  • 网站建设品牌有哪些网店装修
  • 搭建自己的博客网站网站代运营多少钱
  • 付网站建设费甘肃政务服务网
  • 有什么网站可以做微信洛阳建设部官方网站
  • 各大门户网站怎么做推广单页面网站源码
  • 西安十强互联网站建设公司湖南关键词网络科技有限公司
  • 手机音乐网站源码做农业网站
  • 六安品牌网站建设电话中国外贸企业100强
  • 微网站中定位功能怎么做的定制网络流量监控软件
  • logo网站有哪些阿里云win服务器怎么做网站
  • 旅游网站建设的目的与意义是什么意思大连华南网站建设
  • 广州比较好的网站建设公司西安做网站哪里价格低