当前位置: 首页 > news >正文

网站改版要多少钱网站源代码怎么下载

网站改版要多少钱,网站源代码怎么下载,wordpress主题评论,安装两个wordpressSource源算子#xff08;基础篇二#xff09; 目录 Source源算子#xff08;基础篇二#xff09; 二、源算子#xff08;source#xff09; 1. 准备工作 2.从集合中读取数据 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方…Source源算子基础篇二 目录 Source源算子基础篇二 二、源算子source 1. 准备工作 2.从集合中读取数据 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方法直接列出数据获取 3. 从文件中读取数据 说明 4. 从Socket读取数据 1编写StreamWordCount 2在 Linux 环境的主机bigdata1 上执行下列命令发送数据进行测试 3启动 StreamWordCount 程序 4从 bigdata1 发送数据 5看控制台的输出结果 5.从Kafka读取数据 6.自定义源算子source 7.Flink支持的数据类型 二、源算子source Flink 可以从各种来源获取数据然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源而读取数据的算子就是源算子Source。所以Source 就是我们整个处理 程序的输入端。 Flink 代码中通用的添加 Source 的方式是调用执行环境的 addSource()方法 //通过调用 addSource()方法可以获取 DataStream 对象 val stream env.addSource(...) 方法传入一个对象参数需要实现 SourceFunction 接口返回一个 DataStream。 1. 准备工作 case class Event(user: String, url: String, timestamp: Long) 2.从集合中读取数据 最简单的读取数据的方式就是在代码中直接创建一个集合然后调用执行环境的 fromCollection 方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后 作为数据源使用一般用于测试。 import org.apache.flink.streaming.api.scala._case class Event(user: String, url: String, timestamp: Long)object SourceCollection {def main(args: Array[String]): Unit {//获取流执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置并行度并行任务的数量为1env.setParallelism(1)// 创建包含点击事件的列表// 点击操作中包含两个事件val clicks List(Event(Mary, /.home, 1000L), Event(Bob, /.cart, 2000L))//将列表作为流输出//把clicks作为数据流val stream env.fromCollection(clicks)//fromElements从给定的元素集合中创建一个DataStreamval stream1 env.fromElements(Event(zhangsan,/.opt,1000L),Event(lisi,/.opt,2000L))stream.print(stream)stream1.print(stream1)env.execute()} } 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方法直接列出数据获取 3. 从文件中读取数据 真正的实际应用中自然不会直接将数据写在代码中。通常情况下我们会从存储介质中 获取数据一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。 val stream env.readTextFile(input/words.txt)说明 参数可以是文件可以是目录 可以是绝对路径也可以是相对路径 相对路径是从系统属性 user.dir 获取路径:在 IDEA 下是 project 的根目录, standalone 模式下是集群节点根目录 系统属性 user.dir这是一个Java系统属性它表示用户当前的工作目录。在很多应用中它通常被用作参考路径。 IDEA下是project的根目录当你在IDEA中打开一个项目时项目的根目录通常是IDEA的工作目录。相对路径就是基于这个根目录来确定的。 standalone模式下是集群节点根目录如Hadoop分布式计算系统中的独立模式standalone mode。在这种模式下路径可能是相对于集群节点的根目录。 也可以从 HDFS 目录下读取, 使用路径 hdfs://... 前提要在pom文件中添加hadoop相关依赖 4. 从Socket读取数据 不论从集合还是文件我们读取的其实都是有界数据。在流处理的场景中数据往往是无 界的。一个简单的例子就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差一般也是用于测试。 //通过主机名和端口号读取socket文本流val linDs env.socketTextStream(bigdata1,7777) 具体实现案例 1编写StreamWordCount import org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit {val env StreamExecutionEnvironment.getExecutionEnvironment//通过主机名和端口号读取socket文本流val linDs env.socketTextStream(bigdata1,7777)//进行转换计算val result linDs.flatMap(data data.split( )) //用空格切分字符串.map((_,1)) //切分后的字符串转换为一个元组.keyBy(_._1) //使用元组的第一个字段进行分组.sum(1) //分组后的数据的第二个字段进行累加//打印计算结果result.print()env.execute()} } 2在 Linux 环境的主机bigdata1 上执行下列命令发送数据进行测试 $ nc -lk 7777 3启动 StreamWordCount 程序 我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处 理是事件驱动的当前程序会一直处于监听状态只有接收到数据才会执行任务、输出统计结果。 4从 bigdata1 发送数据 hello flink hello world hello scala 5看控制台的输出结果 5.从Kafka读取数据 Kafka 作为分布式消息传输队列是一个高吞吐、易于扩展的消息系统。 而消息队列的传输方式恰恰和流处理是完全一致的。 所以可以说 Kafka 和 Flink 天生一对是当前处理流式数据的双子星。 在如今的实时流处理应用中由 Kafka 进行数据的收集和传输Flink 进行分析计算这样的架构已经成为众多企业的首选 调用 env.addSource()传入 FlinkKafkaConsumer 的对象实例就可以了。 创建 FlinkKafkaConsumer 时需要传入三个参数 第一个参数 topic定义了从哪些主题中读取数据。可以是一个 topic也可以是 topic 列表还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据 时Kafka 连接器将会处理所有 topic 的分区将这些分区的数据放到一条数据流中 去。第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 息被存储为原始的字节数据所以需要反序列化成 Java 或者 Scala 对象。上面代码中 53 使用的 SimpleStringSchema是一个内置的 DeserializationSchema它只是将字节数 组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是 公共接口所以我们也可以自定义反序列化逻辑。第三个参数是一个 Properties 对象设置了 Kafka 客户端的一些属性。 更新中... 6.自定义源算子source 接下来我们创建一个自定义的数据源实现 SourceFunction 接口。主要重写两个关键方法 run()和 cancel()。 run()方法使用运行时上下文对象SourceContext向下游发送数据cancel()方法通过标识位控制退出循环来达到中断数据源的效果。 7.Flink支持的数据类型
http://www.hkea.cn/news/14577231/

相关文章:

  • 沈阳网站建设联系方式德州宁津网站建设
  • 大学网站建设的目标做网站经常加班还是app
  • 西安户县建设厅网站智能建站网站模板
  • 手机网站导航设计唐山网站制作企业
  • 建设电子网站试卷太阳宫网站建设
  • joomla网站模板wordpress 用户主页
  • 做兼职的网站都有哪些工作钢结构
  • 做网站数据库要建什么表c 网站开发入门视频
  • 怎么做离线网站义乌进货网
  • 郑州哪家公司做网站好大连中国建筑装饰网
  • 10个网站 云主机需求.net 网站开发视频教程
  • 苏州网站优化公司怎么添加网站后台
  • 珠海中企网站建设网站关键词分析
  • 泰州网站建设哪家好wordpress引用php文件
  • 闲鱼怎么做钓鱼网站建设摩托车官网中国官网报价大全
  • 网站建站公司有必要做吗网站结构布局
  • 如何做英文网站黑龙江省住房和城乡建设厅
  • 简述电子政务系统网站建设的基本过程电商网站建设题库
  • 淘宝上网站开发做什么网站比较简单
  • 海口网站开发公司新媒体网站建设方案
  • 信誉好的广州做网站广西远伟网络科技有限公司
  • 绍兴cms建站系统教育响应式网站建设
  • 网站首页布局有哪些做网站学什么什么专业
  • 做那个网站比较好wordpress 文章添加图片
  • 公司网站ICP怎么备案呢四川建设安全协会网站
  • 文化传播集团网站建设如何制作游戏?
  • 网站开发需多少钱全网营销系统
  • 德州建设局网站wordpress怎么运行
  • 淘宝代码网站有哪些网站建设所需资料
  • 网站开发强制使用急速内核博客做网站