当前位置: 首页 > news >正文

咸宁网站建设解决方案1t网站空间主机多少钱

咸宁网站建设解决方案,1t网站空间主机多少钱,行业网站建设公司,中小企业网址大道三千:最近我修Flink 目前个人理解#xff1a; 处理有界#xff0c;无界流的工具 FLINK#xff1a; FLINK定义#xff1a; Flink特点 Flink分层API 流的定义 有界数据流#xff08;批处理#xff09;#xff1a; 有界流#xff1a;数据结束了#xff0c;程序也…大道三千:最近我修Flink 目前个人理解 处理有界无界流的工具 FLINK FLINK定义 Flink特点 Flink分层API 流的定义 有界数据流批处理 有界流数据结束了程序也就结束了 知道数据开始以及结束的地方 无界数据流 特征读一条计算一条输出一次结果 知道数据开始的地方,却不知道结束的地方 好似长江大河会一直一直一直产生数据 流的状态 个人理解有状态流会基于内存保存之前的数据 如果后续流的操作需要用到之前的数据这个流时有状态的 如果后续流的操作不需要用到之前的数据这个流是无状态的 DataSet API有界流批处理 已淘汰 1创建执行环境 2读取流数据 3将读取到的数据转换为方便处理的格式 4将收集到的数据进行分组求和最大最小等....操作 //批处理方式(有界流,因为很明确的知道这个文件在哪里结束) public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)DataSourceString lineDS env.readTextFile(input/words.txt);// 3. 转换数据格式FlatMapOperatorString, Tuple2String, Long wordAndOne lineDS.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Override //一行数据 // 数据收集器 out:相当于是一个按照 下面格式收集数据的收集器 格式out.collect(Tuple2.of(word,1L));public void flatMap(String line, CollectorTuple2String, Long out) throws Exception {String[] words line.split( ); //一行数据按照 拆分for (String word : words) { //word 一行中的每一个字段 如果1改成2则统计时数目会成2Tuple2String, Long of Tuple2.of(word, 1L);//每个的那次都转为这种格式out.collect(of); // 收集器添加数据 (转换格式为 (循环到的字段,1L))}}});// 4. 按照 word 进行分组 按照第一个字段分组.(字段,1L),就是按照第一个字段分组(A,1),(b,1),(c,1),(d,1),(d,1) 就是按照abcd分组UnsortedGroupingTuple2String, Long wordAndOneUG wordAndOne.groupBy(0);// 5. 分组内聚合统计 根据第二个字段求和,即将每个分组的第二个字段相加得到该分组的总和AggregateOperatorTuple2String, Long sum wordAndOneUG.sum(1);// 6. 打印结果sum.print();} }DataStream API流、批一体处理 转换flatMap、 分组keyBy、 求和sum、 执行execute、 读取文本readTextFile有界流 1创建流式执行环境基于StreamExecutionEnvironment 2读取文件 3转换、分组、求和得到统计结果 4打印输出 5执行 //流处理方式 (有界流,因为很明确的知道这个文件在哪里结束)如果不是本地而是网络则是无界流 public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSourceString lineStream env.readTextFile(input/words.txt);// 3. 转换、分组、求和得到统计结果 SingleOutputStreamOperatorTuple2String, Integer resultList lineStream.flatM输入类型,输出类型ap(new FlatMapFunctionString, Tuple2String, Integer() {Override //当前行数据 //要返回的类型public void flatMap(String line, CollectorTuple2String, Integer list) throws Exception {String[] fields line.split( );for (String field : fields) {Tuple2String, Integer result Tuple2.of(field, 1);list.collect(result);}}});//分组 // 传入的数据类型() 要分组的数据类型KeyedStreamTuple2String, Integer, String gropbyDate resultList.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0; //这里是类型的第一位。如(hello,1)则是根据hello进行分组}});//求和。 以上一个为例子(hello,1)分组之后,根据1索引即第二位(hello,1)的1进行求和SingleOutputStreamOperatorTuple2String, Integer sum gropbyDate.sum(1);//打印输出sum.print();//执行env.execute();}} // 3. 转换、分组、求和得到统计结果SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long out) throws Exception {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data - data.f0).sum(1); 结果 读取socket无界流 事件监听(环境对象.socketTextStream(IP,端口号)) 备注先启动linux 输入命令nc -lk 7777 然后启动代码监听 7777 此时linux输入的数据会被代码抓取到 备注2跟前两个的区别就是这个是调用的socketTextStream。其他无任何区别 //监听7777端口的数据流 // 这里代码监听了 IP地址192.168.200.130 端口号7777 的操作 。ip地址那里写主机名也行 public class SocketStreamWordCount {public static void main(String[] args) throws Exception {//构建流环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//拿到数据DataStreamSourceString lineStream env.socketTextStream(192.168.200.130, 7777);// 转换、分组、求和得到统计结果SingleOutputStreamOperatorTuple2String, Long convert lineStream.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long out) throws Exception {String[] fields line.split( );for (String field : fields) {Tuple2String, Long of Tuple2.of(field, 1L);out.collect(of);}}});//分组KeyedStreamTuple2String, Long, Object gropBy convert.keyBy(new KeySelectorTuple2String, Long, Object() {Overridepublic Object getKey(Tuple2String, Long value) throws Exception {return value.f0;}});//求和SingleOutputStreamOperatorTuple2String, Long sum gropBy.sum(1);//输出sum.print();//执行env.execute();} }SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap((String line, CollectorTuple2String, Long out) - {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data - data.f0).sum(1); LMD存在泛型擦除解决方案看这里
http://www.hkea.cn/news/14513894/

相关文章:

  • ps做图哪个网站好网站建设 排名下拉
  • 便宜网站建设怎么样网站策划制作公司 北京
  • 在小型网站建设小组中答案知乎 做网站的公司 中企动力
  • 汽车网站名称dedecms做地方网站
  • 建站公司推荐首推万维科技百度搜索网站的图片
  • 网站动画用什么做国外的设计网站推荐
  • 如何能把网站做的更大网站建设的大纲
  • 石家庄正定新区建设局网站做期货财经网站需要哪些资质
  • 网站建设对于企业的意义大气公司网站源码 企业网页模板建站 制作asp程序后台 中英文代码
  • 安徽省网站备案快吗北京网站搭建公司排行
  • 在线A视频做爰网站免费微信网站模板下载工具
  • 南宁网站推广经理做暖暖的视频网站
  • 地产金融网站开发wordpress手机导航
  • 网站打不开原因检测设计工作室网站首页
  • 深圳教育网站设计公司有哪些好玩的网页游戏
  • 设计素材网站能挣钱吗滨海做网站
  • 手机网站和pc网站的区别电商设计包括什么
  • 进口手表网站湖北省建设工程信息网官网
  • 学校网站建设评分标准重庆优化网站推广
  • 虚拟主机管理怎么做网站网泰网站建设网络
  • 石家庄网站建立技术支持骏域建设网站
  • wordpress codeusseo黑帽是什么
  • 网站开发用哪个软件ppt简约大气模板
  • 做新房用哪个网站好什么网站需要数据库
  • 晋中住房与城乡建设厅网站最近中文字幕在线mv免费
  • 可以自己做网站卖东西百度电话查询
  • 深圳做电子工厂的网站114物流网站怎么做
  • 自己创建网站403wordpress快速建站
  • 网站建设谈单情景对话互联网 网站建设
  • 提供温州手机网站制作哪家好wordpress分类目录分页显示