当前位置: 首页 > news >正文

网站首页布局设计模板十堰微网站建设费用

网站首页布局设计模板,十堰微网站建设费用,的物app,wordpress用代码写页面模板使用Flink编写代码#xff0c;步骤非常固定#xff0c;大概分为以下几步#xff0c;只要牢牢抓住步骤#xff0c;基本轻松拿下#xff1a; 1. env-准备环境 2. source-加载数据 3. transformation-数据处理转换 4. sink-数据输出 5. execute-执行 DataStream API开发 //n…使用Flink编写代码步骤非常固定大概分为以下几步只要牢牢抓住步骤基本轻松拿下 1. env-准备环境 2. source-加载数据 3. transformation-数据处理转换 4. sink-数据输出 5. execute-执行 DataStream API开发 //nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/ 0. 添加依赖 propertiesflink.version1.13.6/flink.version /propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-shaded-hadoop-2-uber/artifactIdversion2.7.5-10.0/version/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.24/version/dependency/dependenciesbuildextensionsextensiongroupIdorg.apache.maven.wagon/groupIdartifactIdwagon-ssh/artifactIdversion2.8/version/extension/extensionspluginsplugingroupIdorg.codehaus.mojo/groupIdartifactIdwagon-maven-plugin/artifactIdversion1.0/versionconfiguration!--上传的本地jar的位置--fromFiletarget/${project.build.finalName}.jar/fromFile!--远程拷贝的地址--urlscp://root:rootbigdata01:/opt/app/url/configuration/plugin/plugins/build 编写代码 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类DataStreamString dataStream01 env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);DataStreamString flatMapStream dataStream01.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}});//flatMapStream.print();// Tuple2 指的是2元组DataStreamTuple2String, Integer mapStream flatMapStream.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}});DataStreamTuple2String, Integer sumResult mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元素进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();} } 查看本机的CPU的逻辑处理器的数量逻辑处理器的数量就是你的分区数量。 12 spark 13 kakfa 11 spark 11 flink 11 kafka 13 hadoop 12 sqoop 13 flink 12 flink前面的数字是分区数默认跟逻辑处理器的数量有关系。 对结果进行解释 什么是批什么是流 批处理结果前面的序号代表分区 流处理结果 也可以通过如下方式修改分区数量 env.setParallelism(2); 关于并行度的代码演示 系统以及算子都可以设置并行度或者获取并行度 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount01 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类DataStreamString dataStream01 env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);DataStreamString flatMapStream dataStream01.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}});// 每一个算子也有自己的并行度一般跟系统保持一致System.out.println(flatMap的并行度flatMapStream.getParallelism());//flatMapStream.print();// Tuple2 指的是2元组DataStreamTuple2String, Integer mapStream flatMapStream.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}});DataStreamTuple2String, Integer sumResult mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1);sumResult.print();// 执行env.execute();} }打包、上传 文件夹不需要提前准备好它可以帮我创建 提交我们自己开发打包的任务 flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 去界面中查看运行结果 因为你这个是集群运行的所以标准输出流中查看假如第一台没有去第二台查看一直点。 获取主函数参数工具类 可以通过外部传参的方式给定一个路径 以下代码可以做到假如给定路径就获取路径的数据假如没给就读取默认数据 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount02 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path args[0];dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1).print();// 执行env.execute();} }flink run -c com.bigdata.day01.Demo02 FlinkDemo-1.0-SNAPSHOT.jar /home/wc.txt 这样做跟我们以前的做法还是不一样。以前的运行方式是这样的 flink run /opt/installs/flink/examples/batch/WordCount.jar --input /home/wc.txt 这个写法传递参数的时候带有--字样而我们的没有。 以上代码进行升级我想将参数前面追加一个 --input 这样怎么写 ParameterTool parameterTool ParameterTool.fromArgs(args); if(parameterTool.has(output)){path parameterTool.get(output); }在代码中的使用 ParameterTool parameterTool ParameterTool.fromArgs(args);String output ;if (parameterTool.has(output)) {output parameterTool.get(output);System.out.println(指定了输出路径使用: output);} else {output hdfs://node01:9820/wordcount/output47_;System.out.println(可以指定输出路径使用 --output ,没有指定使用默认的: output);} 升级过的代码 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount02 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path ;ParameterTool parameterTool ParameterTool.fromArgs(args);if(parameterTool.has(input)){path parameterTool.get(input);}else{path args[0];}dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString collector) throws Exception {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1); // (hello,1)}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}// 此处的1 指的是元组的第二个元组进行相加的意思}).sum(1).print();// 执行env.execute();} }DataStream (Lambda表达式-扩展 了解) import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;import java.util.Arrays;/*** Desc 演示Flink-DataStream-流批一体API完成批处理WordCount* 使用Java8的lambda表示完成函数式风格的WordCount*/ public class WordCount02 {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动//不设置的话默认是流模式defaultValue(RuntimeExecutionMode.STREAMING)//TODO 2.source-加载数据DataStreamString dataStream env.fromElements(flink hadoop spark, flink hadoop spark, flink hadoop, flink);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁/*public interface FlatMapFunctionT, O extends Function, Serializable {void flatMap(T value, CollectorO out) throws Exception;}*//*DataStreamString wordsDS dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] words value.split( );for (String word : words) {out.collect(word);}}});*///注意:Java8的函数的语法/lambda表达式的语法: (参数)-{函数体}DataStreamString wordsDS dataStream.flatMap((String value, CollectorString out) - {String[] words value.split( );for (String word : words) {out.collect(word);}}).returns(Types.STRING);//3.2 每个单词记为单词,1/*public interface MapFunctionT, O extends Function, Serializable {O map(T value) throws Exception;}*//*DataStreamTuple2String, Integer wordAndOneDS wordsDS.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value, 1);}});*/DataStreamTuple2String, Integer wordAndOneDS wordsDS.map((String value) - Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));//3.3分组//注意:DataSet中分组用groupBy,DataStream中分组用keyBy//KeyedStreamTuple2String, Integer, Tuple keyedDS wordAndOneDS.keyBy(0);/*public interface KeySelectorIN, KEY extends Function, Serializable {KEY getKey(IN value) throws Exception;}*//*KeyedStreamTuple2String, Integer, String keyedDS wordAndOneDS.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}});*/KeyedStreamTuple2String, Integer, String keyedDS wordAndOneDS.keyBy((Tuple2String, Integer value) - value.f0);//3.4聚合SingleOutputStreamOperatorTuple2String, Integer result keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();} } 此处有一个大坑就是使用完lambda表达式以后需要添加一个returns(Types.STRING); 否则报错这样的话使用lambda也不是特别快了。 连着写的版本如下 package com.bigdata.day01;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class WordCount03 {/*** 1. env-准备环境* 2. source-加载数据* 3. transformation-数据处理转换* 4. sink-数据输出* 5. execute-执行*/public static void main(String[] args) throws Exception {// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行都这么写非常方便StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 这个是 自动 根据流的性质决定是批处理还是流处理//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 批处理流 一口气把数据算出来// env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 流处理默认是这个 可以通过打印批和流的处理结果体会流和批的含义//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 将任务的并行度设置为2// env.setParallelism(2);// 通过这个获取系统的并行度int parallelism env.getParallelism();System.out.println(parallelism);// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类所以可以这么写// 以下代码中路径是写死的能不能通过外部传参进来当然可以 agrsDataStreamString dataStream null;System.out.println(args.length);if(args.length !0){String path ;ParameterTool parameterTool ParameterTool.fromArgs(args);if(parameterTool.has(input)){path parameterTool.get(input);}else{path args[0];}dataStream env.readTextFile(path);}else{dataStream env.fromElements(spark flink kafka, spark sqoop flink, kakfa hadoop flink);}dataStream.flatMap((String line, CollectorString collector) - {String[] arr line.split( );for (String word : arr) {// 循环遍历每一个切割完的数据放入到收集器中就可以形成一个新的DataStreamcollector.collect(word);}}).returns(Types.STRING).map((String word)- {return Tuple2.of(word, 1); // (hello,1)}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2String, Integer tuple2)- {return tuple2.f0;}).sum(1).print();// 执行env.execute();} }
http://www.hkea.cn/news/14555054/

相关文章:

  • 开源php网站开发wordpress页面链接地址
  • 东莞南城网站制作公司洛阳网站建设电话
  • 如何自己搭建一个企业网站网站在线问答怎么做
  • 情人节给女朋友做网站很久以前做相册mv的网站
  • 便利的菏泽网站建设企业品牌网站营销
  • 计算机编程与网站建设114啦网址导航官网
  • 现在收废品做哪个网站好口碑好的邯郸网站建设
  • 建设厅网站ca验证失败兰州搜狗推广
  • 株洲网站制作公司有哪些娱乐网站开发多少钱
  • 手机网站源码带后台wap网站生成
  • 常用网站建设工具腾讯朋友圈广告代理
  • 吐槽做网站东莞市领导班子
  • 十堰网站建设培训wordpress主页主题
  • 桃城网站建设价格上海网站免费制作
  • flash企业网站模板php网站突然不能访问
  • 忻州市建设厅网站首页网页制作代码html
  • 湖南网站建设工作室wordpress改网站信息
  • 长春做网站优化价格小程序打包成app
  • 怎样做网站的链接室内设计学校前十
  • 网站开发技术文档格式保定做网站设计
  • 做网站的服务商wordpress页面调取
  • 网站怎么用深圳企业vi设计公司
  • 软件下载类型网站怎么做蛋糕网站源码
  • 网站开发市场现在怎么样用来做旅游攻略的网站
  • 做兼职一般去哪个网站好六安市裕安区建设局网站
  • 网站推广朋友圈文案公司被其它人拿来做网站
  • 免费制作微信小程序的网站小城镇建设网站的观点
  • 不会被封的网站谁做公司翻译
  • 装饰装修网站大全网站建设后预期推广方式
  • wordpress中热门文章黑帽seo技术培训