广州 建 网站,wordpress 全站pjax,做局域网站数据库,一级a做爰片免费网站短视频播放Source源算子#xff08;基础篇二#xff09; 目录
Source源算子#xff08;基础篇二#xff09;
二、源算子#xff08;source#xff09;
1. 准备工作
2.从集合中读取数据
可以使用代码中的fromCollection()方法直接读取列表
也可以使用代码中的fromElements()方…Source源算子基础篇二 目录
Source源算子基础篇二
二、源算子source
1. 准备工作
2.从集合中读取数据
可以使用代码中的fromCollection()方法直接读取列表
也可以使用代码中的fromElements()方法直接列出数据获取
3. 从文件中读取数据
说明
4. 从Socket读取数据
1编写StreamWordCount
2在 Linux 环境的主机bigdata1 上执行下列命令发送数据进行测试
3启动 StreamWordCount 程序
4从 bigdata1 发送数据
5看控制台的输出结果
5.从Kafka读取数据
6.自定义源算子source
7.Flink支持的数据类型 二、源算子source Flink 可以从各种来源获取数据然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源而读取数据的算子就是源算子Source。所以Source 就是我们整个处理 程序的输入端。
Flink 代码中通用的添加 Source 的方式是调用执行环境的 addSource()方法
//通过调用 addSource()方法可以获取 DataStream 对象
val stream env.addSource(...)
方法传入一个对象参数需要实现 SourceFunction 接口返回一个 DataStream。
1. 准备工作 case class Event(user: String, url: String, timestamp: Long)
2.从集合中读取数据 最简单的读取数据的方式就是在代码中直接创建一个集合然后调用执行环境的 fromCollection 方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后 作为数据源使用一般用于测试。 import org.apache.flink.streaming.api.scala._case class Event(user: String, url: String, timestamp: Long)object SourceCollection {def main(args: Array[String]): Unit {//获取流执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置并行度并行任务的数量为1env.setParallelism(1)// 创建包含点击事件的列表// 点击操作中包含两个事件val clicks List(Event(Mary, /.home, 1000L), Event(Bob, /.cart, 2000L))//将列表作为流输出//把clicks作为数据流val stream env.fromCollection(clicks)//fromElements从给定的元素集合中创建一个DataStreamval stream1 env.fromElements(Event(zhangsan,/.opt,1000L),Event(lisi,/.opt,2000L))stream.print(stream)stream1.print(stream1)env.execute()}
} 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方法直接列出数据获取 3. 从文件中读取数据 真正的实际应用中自然不会直接将数据写在代码中。通常情况下我们会从存储介质中 获取数据一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。 val stream env.readTextFile(input/words.txt)说明 参数可以是文件可以是目录 可以是绝对路径也可以是相对路径 相对路径是从系统属性 user.dir 获取路径:在 IDEA 下是 project 的根目录, standalone 模式下是集群节点根目录 系统属性 user.dir这是一个Java系统属性它表示用户当前的工作目录。在很多应用中它通常被用作参考路径。 IDEA下是project的根目录当你在IDEA中打开一个项目时项目的根目录通常是IDEA的工作目录。相对路径就是基于这个根目录来确定的。 standalone模式下是集群节点根目录如Hadoop分布式计算系统中的独立模式standalone mode。在这种模式下路径可能是相对于集群节点的根目录。 也可以从 HDFS 目录下读取, 使用路径 hdfs://... 前提要在pom文件中添加hadoop相关依赖 4. 从Socket读取数据 不论从集合还是文件我们读取的其实都是有界数据。在流处理的场景中数据往往是无 界的。一个简单的例子就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差一般也是用于测试。 //通过主机名和端口号读取socket文本流val linDs env.socketTextStream(bigdata1,7777)
具体实现案例
1编写StreamWordCount
import org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit {val env StreamExecutionEnvironment.getExecutionEnvironment//通过主机名和端口号读取socket文本流val linDs env.socketTextStream(bigdata1,7777)//进行转换计算val result linDs.flatMap(data data.split( )) //用空格切分字符串.map((_,1)) //切分后的字符串转换为一个元组.keyBy(_._1) //使用元组的第一个字段进行分组.sum(1) //分组后的数据的第二个字段进行累加//打印计算结果result.print()env.execute()}
}
2在 Linux 环境的主机bigdata1 上执行下列命令发送数据进行测试
$ nc -lk 7777
3启动 StreamWordCount 程序 我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处 理是事件驱动的当前程序会一直处于监听状态只有接收到数据才会执行任务、输出统计结果。 4从 bigdata1 发送数据
hello flink
hello world
hello scala
5看控制台的输出结果 5.从Kafka读取数据 Kafka 作为分布式消息传输队列是一个高吞吐、易于扩展的消息系统。 而消息队列的传输方式恰恰和流处理是完全一致的。 所以可以说 Kafka 和 Flink 天生一对是当前处理流式数据的双子星。 在如今的实时流处理应用中由 Kafka 进行数据的收集和传输Flink 进行分析计算这样的架构已经成为众多企业的首选 调用 env.addSource()传入 FlinkKafkaConsumer 的对象实例就可以了。
创建 FlinkKafkaConsumer 时需要传入三个参数
第一个参数 topic定义了从哪些主题中读取数据。可以是一个 topic也可以是 topic 列表还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据 时Kafka 连接器将会处理所有 topic 的分区将这些分区的数据放到一条数据流中 去。第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 息被存储为原始的字节数据所以需要反序列化成 Java 或者 Scala 对象。上面代码中 53 使用的 SimpleStringSchema是一个内置的 DeserializationSchema它只是将字节数 组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是 公共接口所以我们也可以自定义反序列化逻辑。第三个参数是一个 Properties 对象设置了 Kafka 客户端的一些属性。
更新中...
6.自定义源算子source
接下来我们创建一个自定义的数据源实现 SourceFunction 接口。主要重写两个关键方法 run()和 cancel()。
run()方法使用运行时上下文对象SourceContext向下游发送数据cancel()方法通过标识位控制退出循环来达到中断数据源的效果。
7.Flink支持的数据类型