网站制作深圳,cnetos 7 wordpress,沙井网站推广,个人网站建设发布信息源算子Data Source 概述内置Data Source基于集合构建基于文件构建基于Socket构建 自定义Data SourceSourceFunctionRichSourceFunction 常见连接器第三方系统连接器File Source连接器DataGen Source连接器Kafka Source连接器RabbitMQ Source连接器MongoDB Source连接器 概述 Fl… 源算子Data Source 概述内置Data Source基于集合构建基于文件构建基于Socket构建 自定义Data SourceSourceFunctionRichSourceFunction 常见连接器第三方系统连接器File Source连接器DataGen Source连接器Kafka Source连接器RabbitMQ Source连接器MongoDB Source连接器 概述 Flink中的Data Source数据源、源算子用于定义数据输入的来源。数据源是Flink作业的起点它可以从各种数据来源获取数据例如文件系统、消息队列、数据库等。 将数据源添加到Flink执行环境中从而创建一个数据流。然后可以对该数据流应用一系列转换和操作例如过滤、转换、聚合、计算等。最后将结果写入其他系统例如文件系统、数据库、消息队列等。 数据源是Flink作业中非常重要的组件它确定了数据的来源和初始输入是构建流处理和批处理作业的基础。 内置Data Source Flink Data Source用于定义Flink程序的数据来源Flink官方提供了多种数据获取方法用于帮助开发者简单快速地构建输入流 基于集合构建 可以将数据临时存储到内存中形成特殊的数据结构后作为数据源使用比如采用集合类型。一般用来进行本地调试或者验证。 fromCollection(Collection)基于集合构建集合中的所有元素必须是同一类型fromElements(T ...) 基于元素构建所有元素必须是同一类型generateSequence(from, to)基于给定的序列区间进行构建fromCollection(Iterator, Class)基于迭代器进行构建。第一个参数用于定义迭代器第二个参数用于定义输出元素的类型public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 基于元素构建DataStreamSourceInteger source1 env.fromElements(1, 2, 3, 4);// 基于集合构建DataStreamSourceInteger source2 env.fromCollection(Arrays.asList(1, 2, 3, 4));// 基于给定的序列区间进行构建env.generateSequence(0,100);// 基于迭代器进行构建env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();source1.print();env.execute();}自定义的迭代器CustomIterator产生 1 到 100 区间内的数据 注意: 自定义迭代器要实现Iterator接口外还必须要实现序列化接口Serializable 否则会抛出序列化失败的异常 import java.io.Serializable;
import java.util.Iterator;public class CustomIterator implements IteratorInteger, Serializable {private Integer i 0;Overridepublic boolean hasNext() {return i 100;}Overridepublic Integer next() {i;return i;}
}基于文件构建 在本地环境进行测试时可以方便地从本地文件读取数据 readTextFile(path)按照TextInputFormat 格式读取文本文件并将其内容以字符串的形式返回。示例如下readFile(fileInputFormat, path) 按照指定格式读取文件。readFile(inputFormat, filePath, watchType, interval, typeInformation)按照指定格式周期性的读取文件。各个参数含义
inputFormat数据流的输入格式filePath文件路径可以是本地文件系统上的路径也可以是HDFS上的文件路径watchType读取方式两个可选值: 1.FileProcessingMode.PROCESS_ONCE: 表示对指定路径上的数据只读取一次然后退出2.FileProcessingMode.PROCESS_CONTINUOUSLY: 表示对路径进行定期地扫描和读取。注意:当文件被修改时其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理因此这会打破Flink的exactly-once语义interval定期扫描的时间间隔typeInformation输入流中元素的类型public static void main(String[] args) throws Exception {String filePath data/test.text;// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env// 读取文本文件并将其内容以字符串的形式返回.readTextFile(filePath).print();env.readFile(new TextInputFormat(new Path(filePath)), filePath, FileProcessingMode.PROCESS_ONCE, 1, BasicTypeInfo.STRING_TYPE_INFO).print();env.execute();}基于Socket构建 通过监听Socket端口可以在本地很方便地模拟一个实时计算环境。 Flink提供了socketTextStream方法可以通过host和port从一个Socket中以文本的方式读取数据以此构建基于Socket的数据流 socketTextStream方法有以下四个主要参数
hostname主机名port端口号设置为 0 时表示端口号自动分配delimiter用于分隔每条记录的分隔符maxRetry当Socket临时关闭时程序的最大重试间隔单位为秒。设置为0时表示不进行重试设置为负值则表示一直重试示例如下
env.socketTextStream(IP, 8888, \n, 3).print();读取socket文本流是流处理场景这种方式由于吞吐量小、稳定性较差一般也是用于测试 // 创建执行环境
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);// 读取socket文本流
DataStreamSourceString socketDS env.socketTextStream(IP, 8888);注意基于Socket构建数据源一般需要搭配Netcat使用。 Netcat又称为NC是一个计算机网络工具它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。它被广泛用于测试网络中的端口发送文件等操作。使用 Netcat 可以轻松地进行网络调试和探测也可以进行加密连接和远程管理等高级网络操作。因为其功能强大而又简单易用所以在计算机安全领域也有着广泛的应用。 安装nc命令
yum install -y nc启动socket端口
[rootmaster ~]# nc -l 8080
abc bcd cde
bcd cde fgh
cde fgh hij注意 测试时先启动端口后启动程序会报超时连接异常最后发送测试数据即可。 自定义Data Source 可以通过实现Flink的SourceFunction、ParallelSourceFunction、RichSourceFunction、RichParallelSourceFunction等类并重写其方法以此实现自定义Data Source ParallelSourceFunction、RichParallelSourceFunction分别与SourceFunction、RichSourceFunctio功能类似只不过它们通过SourceContext发送的数据会自动分发到并行任务中去也就是说具有并行度的功能。 SourceFunction 它是Flink 提供的基础接口之一用于定义数据源的行为。它包含一个 run 方法该方法用于启动数据源并使用SourceContext来发送数据元素。它中的方法是生命周期很简单的基础方法。 操作步骤
实现SourceFunction接口创建一个实现SourceFunction接口的类该接口定义读取数据并发出数据流的方法。这个接口中的核心方法是run()和cancel()其中run()方法用于读取数据并发出一系列事件cancel()方法用于取消数据源的运行实现run()方法可以定义从数据源读取数据的逻辑。这可以是从文件、数据库、消息队列等读取数据的逻辑。在适当的时候使用collect()方法将读取的数据发出到数据流中实现cancel()方法可以编写停止或清理数据源的逻辑。例如如果数据源使用了外部资源在这里释放这些资源注册数据源将数据源注册到Flink的执行环境中以便可以在作业中使用。通过执行环境的addSource()方法向执行环境添加数据源public class MySource implements SourceFunctionString {private boolean isRunning true;/*** run() 方法是核心方法它会不断地读取、产生数据并将数据发送到下游* */Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {// 产生一些数据String data UUID.randomUUID().toString();// 将数据发送到下游ctx.collect(data);// 每秒产生一条数据Thread.sleep(1000);}}/*** cancel() 方法用于在取消任务时清理资源*/Overridepublic void cancel() {isRunning false;}
}将自定义的数据源传递给 env.addSource() 方法并通过 .print() 将数据打印到控制台中。最后调用 env.execute() 方法来启动Flink程序。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 将自定义的数据源添加到 Flink 程序中DataStreamSourceString streamSource env.addSource(new MySource());streamSource.print();env.execute(MyApp);}RichSourceFunction 如果需要更高级的功能和更丰富的生命周期控制可以使用RichSourceFunction 类。RichSourceFunction是 SourceFunction接口的子类它提供了额外的方法和功能例如初始化、配置和资源管理。 操作步骤
扩展RichSourceFunction 类创建一个类扩展 RichSourceFunctionT 类并将 T 替换为要发出的数据类型实现open() 方法进行初始化操作例如建立与外部系统的连接或加载资源等。这个方法是在数据源的生命周期开始时被调用的实现run() 方法实现读取数据并发出数据流的逻辑。这个方法在启动数据源时会被调用实现cancel() 方法添加取消数据源的逻辑。这个方法将在停止数据源时调用实现close() 方法进行一些资源清理操作。这个方法是在数据源生命周期结束时调用的import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class CustomRichDataSource extends RichSourceFunctionString {private volatile boolean isRunning true;Overridepublic void open(Configuration parameters) throws Exception {// 初始化操作例如建立与外部系统的连接或加载资源等}Overridepublic void run(SourceContextString ctx) throws Exception {while (isRunning) {// 读取数据的逻辑// 发出数据到数据流ctx.collect(Hello, World!);// 控制发送数据的速度Thread.sleep(1000);}}Overridepublic void cancel() {isRunning false;}Overridepublic void close() throws Exception {// 资源清理操作}
}常见连接器
第三方系统连接器 Flink内置了多种连接器用于满足大多数的数据收集场景。连接器可以和多种多样的第三方系统进行交互。 Flink官方目前支持以下第三方系统连接器
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon DynamoDB (sink)
Amazon Kinesis Data Streams (source/sink)
Amazon Kinesis Data Firehose (sink)
DataGen (source)
Elasticsearch (sink)
Opensearch (sink)
FileSystem (sink)
RabbitMQ (source/sink)
Google PubSub (source/sink)
Hybrid Source (source)
Apache Pulsar (source)
JDBC (sink)
MongoDB (source/sink)除Flink官方之外还有一些其他第三方系统与Flink的连接器通过Apache Bahir发布:
Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)File Source连接器 从文件读取数据是一种常见方式比如读取日志文件这是批处理中最常见的读取方式 flink-connector-files是Apache Flink的一个连接器用于将本地文件系统或远程文件系统中的文件作为数据源或数据接收器使用。 它提供了一种简单的方法来处理文本文件或其他格式的文件例如CSV、JSON、Avro等并将其转换为Flink数据流。在使用时可以指定文件的路径、编码方式和分隔符等参数并使用适当的转换函数将文件内容解析为Flink的数据类型然后进行数据处理和分析。 它支持对输出流的写入操作将Flink数据流中的结果写入到指定的文件中。可以通过配置文件路径、编码方式和文件格式等参数来控制输出文件的格式和内容 添加文件连接器依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion1.17.0/versionscopeprovided/scope/dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 从文件流中逐条记录读取** 文件路径参数可以是目录具体文件、以及从HDFS目录下读取* 路径可以是相对路径也可以是绝对路径* 相对路径是从系统属性user.dir获取路径idea下是project的根目录standalone模式下是集群节点根目录*/FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(data/word.txt)).build();/*** source ——用户定义的来源* sourceName – 数据源的名称*/env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), file-source).print();env.execute();}DataGen Source连接器
Flink提供了一个内置的DataGen连接器主要用于生成一些随机数进行流任务的测试以及性能测试
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion1.17.0/version
/dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 如果有n个并行度 最大值设为a// 将数值 均分成 n份 a/n ,比如最大100并行度2每个并行度生成50个// 其中一个是 0-49另一个50-99/*** DataGeneratorSource中,单个并行度生成数据个数与 与 生成的数据个数 相关* 公式: 生成的数据个数 / 并行度 每个并行度生成个数* 例子: 并行度设置为2,生成数据个数100,则每个并行度生成个数100/2. 一个并行度:0-49 另一个并行度:50-99*/env.setParallelism(2);/*** 数据生成器Source* GeneratorFunctionLong, OUT generatorFunction : GeneratorFunction接口函数需要实现 重写map方法 输入类型固定是Long* long count : 生成的数据个数。自动生成的数字序列,从0自增。当数字数序列最大值达到或小于这个值就停止* RateLimiterStrategy rateLimiterStrategy :限速策略如每秒生成几条数据* TypeInformationOUT typeInfo : 返回的数据类型**/DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generator).print();env.execute();}1 Number:0
2 Number:50
2 Number:51
1 Number:1
2 Number:52
1 Number:2
2 Number:53Kafka Source连接器 Flink-connector-kafka就是Flink的一个连接器它提供了一个简单的方法来将Kafka作为Flink应用程序的数据源或数据接收器使用。 flink-connector-kafka可以帮助Flink应用程序从Kafka主题中读取数据也可以将Flink的数据流写入到Kafka主题中。在使用时可以指定Kafka集群的地址、主题名称、消费者组名称等参数并使用适当的序列化和反序列化工具将数据转换为Flink的数据类型。 Topic、Partition订阅 Kafka Source提供了3 种Topic、Partition的订阅方式 1.Topic 列表订阅 Topic 列表中所有Partition的消息
KafkaSource.builder().setTopics(topic-a, topic-b);2.正则表达式匹配订阅与正则表达式所匹配的Topic下的所有Partition
KafkaSource.builder().setTopicPattern(topic.*);3.Partition列表订阅指定的 Partition
final HashSetTopicPartition partitionSet new HashSet(Arrays.asList(new TopicPartition(topic-a, 0), // 主题为 topic-a的0号分区new TopicPartition(topic-b, 5))); // 主题为 topic-b的5号分区
KafkaSource.builder().setPartitions(partitionSet);起始消费位点 Kafka source 能够通过位点初始化器OffsetsInitializer来指定从不同的偏移量开始消费 。 如果内置的初始化器不能满足需求也可以实现自定义的位点初始化器OffsetsInitializer 如果未指定位点初始化器将默认使用 OffsetsInitializer.earliest() 内置的位点初始化器包括
KafkaSource.builder()// 从消费组提交的位点开始消费不指定位点重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())// 从消费组提交的位点开始消费如果提交位点不存在使用最早位点.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 从时间戳大于等于指定时间戳毫秒的数据开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 从最早位点开始消费.setStartingOffsets(OffsetsInitializer.earliest())// 从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest());动态分区检查 为了在不重启Flink作业的情况下处理Topic扩容或新建Topic等场景可以将KafkaSource配置为在提供的Topic/Partition订阅模式下定期检查新分区。分区检查功能默认不开启。 KafkaSource.builder().setProperty(partition.discovery.interval.ms, 10000); // 每 10 秒检查一次新分区事件时间和水印 默认情况下Kafka Source使用Kafka消息中的时间戳作为事件时间。可以定义自己的水印策略Watermark Strategy 以从消息中提取事件时间并向下游发送水印 env.fromSource(kafkaSource, new CustomWatermarkStrategy(), Kafka Source With Custom Watermark Strategy);示例
引入Kafka连接器依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.0/version/dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSourceString kafkaSource KafkaSource.Stringbuilder()// 指定kafka节点的地址和端口.setBootstrapServers(node01:9092,node02:9092,node03:9092)// 指定消费者组的id.setGroupId(flink_group)// 指定消费的 Topic.setTopics(flink_topic)// 指定反序列化器反序列化value.setValueOnlyDeserializer(new SimpleStringSchema())// flink消费kafka的策略.setStartingOffsets(OffsetsInitializer.latest()).build();// 不使用 watermark 的策略意味着数据流不会根据事件时间进行处理DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka_source);stream.print(Kafka);// 定义事件时间watermark策略处理数据流中的无序事件并设置最大延迟时间为3秒。DataStreamSinkString kafka_source env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafka_source).print(Kafka);env.execute();}RabbitMQ Source连接器
添加对RabbitMQ连接器的依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-rabbitmq/artifactIdversion3.0.1-1.17/version
/dependency1.服务质量 (QoS) 服务质量是一种用于控制数据源连接器如何消费消息的策略。在Flink中服务质量定义了消费者和消息代理之间的消息传输保证级别。通过合适的服务质量设置可以实现以下不同的语义保证 Exactly-once确保消息仅被正确处理一次At-least-once确保消息至少被正确处理一次None最多一次不提供消息处理保证可能会出现重复处理或丢失消息的情况1.精确一次:
保证精确一次需要以下条件
开启checkpointing: 开启之后消息在checkpoints完成之后才会被确认然后从RabbitMQ队列中删除使用关联标识Correlationids: 关联标识是RabbitMQ的一个特性消息写入RabbitMQ时在消息属性中设置。从checkpoint恢复时有些消息可能会被重复处理source可以利用关联标识对消息进行去重。非并发source: 为了保证精确一次的数据投递source必须是非并发的并行度设置为1。这主要是由于RabbitMQ分发数据时是从单队列向多个消费者投递消息的。2.至少一次: 在checkpointing开启的条件下如果没有使用关联标识或者source是并发的那么source就只能提供至少一次的保证。 3.无任何保证: 如果没有开启checkpointingsource就不能提供任何的数据投递保证。使用这种设置时source一旦接收到并处理消息消息就会被自动确认。 2.消费者预取Consumer Prefetch
注意 默认情况下是不设置prefetch count的这意味着RabbitMQ服务器将会无限制地向source发送消息。因此在生产环境中最好要设置它。 prefetch count是对单个channel设置的并且由于每个并发的source都持有一个connection/channel因此这个值实际上会乘以 source 的并行度来表示同一时间可以向这个job总共发送多少条未确认的消息。 使用setPrefetchCount()方法用于设置消费者预取值这里将其设置为 10。这意味着每个消费者在处理完 10 条消息之前不会从 RabbitMQ 队列中获取更多的消息。 RMQConnectionConfig connectionConfig new RMQConnectionConfig.Builder().setPrefetchCount(10) //设置消费者预取值为 10....build();以下是保证exactly-once的RabbitMQ source示例 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点checkpointing以实现精确一次或至少一次的一致性保证env.enableCheckpointing(5000); // 每 5000 毫秒执行一次检查点final RMQConnectionConfig connectionConfig new RMQConnectionConfig.Builder().setHost(localhost) // RabbitMQ 主机名.setPort(5672) // RabbitMQ 端口号.setUserName(guest) // RabbitMQ 用户名.setPassword(guest) // RabbitMQ 密码.setVirtualHost(/) // RabbitMQ 虚拟主机.setPrefetchCount(10) // 设置消费者预取值为 10.build();final DataStreamString stream env.addSource(new RMQSourceString(connectionConfig, // RabbitMQ 连接配置queueName, // 需要消费的 RabbitMQ 队列名true, // 是否使用关联 ID如果仅需要至少一次的保证可以设置为 falsenew SimpleStringSchema())) // 反序列化方案将消息转换为 Java 对象.setParallelism(1); // 非并行的源仅在需要精确一次性保证时才需要设置stream.print();env.execute(RabbitMQ Source Example);}MongoDB Source连接器 Flink 提供了MongoDB 连接器使用至少一次At-least-once的语义在 MongoDB collection中读取和写入数据。 要使用此连接器先添加依赖到项目中
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-mongodb/artifactIdversion1.0.1-1.17/version
/dependencypublic static void main(String[] args) throws Exception {MongoSourceString source MongoSource.Stringbuilder()// MongoDB 连接 URI.setUri(mongodb://user:password127.0.0.1:27017)// 数据库名.setDatabase(my_db)// 集合名.setCollection(my_coll)// 投影的字段.setProjectedFields(_id, f0, f1)// 默认值 2048 设置每次循环读取时应该从游标中获取的行数.setFetchSize(2048)// 默认值-1 限制每个reader最多读取文档的数量。如果设置了读取并行度大于1那么最多读取的文档数量等于 并行度 * 限制数量。.setLimit(10000)// 默认值 true 不使用游标超时 防止cursor因为读取时间过长或者背压导致的空闲而关闭.setNoCursorTimeout(true)/*** 使用分区可以利用并行读取来加速整体的读取效率。** 设置分区策略可选的分区策略有 SINGLESAMPLESPLIT_VECTORSHARDED 和 DEFAULT** SINGLE将整个集合作为一个分区。* SAMPLE通过随机采样的方式来生成分区快速但可能不均匀。* SPLIT_VECTOR通过 MongoDB 计算分片的 splitVector 命令来生成分区快速且均匀。 仅适用于未分片集合需要 splitVector 权限。* SHARDED从 config.chunks 集合中直接读取分片集合的分片边界作为分区不需要额外计算快速且均匀。 仅适用于已经分片的集合需要 config 数据库的读取权限。* DEFAULT对分片集合使用 SHARDED 策略对未分片集合使用 SPLIT_VECTOR 策略。*/.setPartitionStrategy(PartitionStrategy.SAMPLE) // 设置每个分区的内存大小默认值64mb 通过指定的分区大小将 MongoDB 的一个集合切分成多个分区。 可以设置并行度并行地读取这些分区以提升整体的读取速度。.setPartitionSize(MemorySize.ofMebiBytes(64))// 默认值10 仅用于 SAMPLE 抽样分区策略设置每个分区的样本数量。抽样分区器根据分区键对集合进行随机采样的方式计算分区边界。 总的样本数量 每个分区的样本数量 * (文档总数 / 每个分区的文档数量).setSamplesPerPartition(10)// 设置 MongoDeserializationSchema 用于解析 MongoDB BSON 类型的文档.setDeserializationSchema(new MongoDeserializationSchemaString() {Overridepublic String deserialize(BsonDocument document) {return document.toJson();}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}) // 自定义的反序列化方案.build();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.fromSource(source, WatermarkStrategy.noWatermarks(), MongoDB-Source).setParallelism(2) // 设置并行度为 2.print().setParallelism(1); // 设置并行度为 1env.execute(MongoDB Source Example);}