当前位置: 首页 > news >正文

一个网站建设流程做网站发布

一个网站建设流程,做网站发布,郑州网站设计 品牌 视觉,网页可视化设计一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map#xff08;映射#xff09; 将数据流中的数据进行转换, 形成新的数据流#xff0c;消费一个元素并产出一个元素…一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map映射 将数据流中的数据进行转换, 形成新的数据流消费一个元素并产出一个元素 package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn ExecutionEnvironment.getExecutionEnvironment();DataSourceInteger s_num svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunctionInteger, Integer() {Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();} }2.2、filter过滤 根据指定的规则将满足条件true的数据保留不满足条件(false)的数据丢弃 package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn ExecutionEnvironment.getExecutionEnvironment();DataSourceInteger elements svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9); // elements.filter(new FilterFunctionInteger() { // Override // public boolean filter(Integer integer) throws Exception { // if (integer % 2 0 ) // return false; // else { // return true; // } // } // }).print();elements.filter(value - value % 2 ! 0).print();} }2.3、flatMap扁平映射 消费一个元素并产生零个或多个元素 package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn ExecutionEnvironment.getExecutionEnvironment();DataSourceInteger dataSource svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunctionInteger, Integer() {Overridepublic void flatMap(Integer integer, CollectorInteger collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();} }三、聚合算子 3.1、keyBy按键分区 把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中 package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunctionTuple2Long, Long, String) longLongTuple2 - key: longLongTuple2.f0 ,value: longLongTuple2.f1).print();env.execute(execute);} }3.2、sum,min,max,minBy,maxBy简单聚合 KeyedStream的每一个支流做聚合。执行完成后会将聚合的结果合成一个流返回所以结果都是DataStream package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute(execute);} }3.3、reduce归约聚合 一个分组数据流的聚合操作合并当前的元素和上次聚合的结果产生一个新的值返回的流中包含每一次聚合的结果而不是只返回最后一次聚合的最终结果。 package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunctionTuple2Long, Long() {Overridepublic Tuple2Long, Long reduce(Tuple2Long, Long values1, Tuple2Long, Long values2) throws Exception {return new Tuple2(values1.f0,values1.f1values2.f1);}}).print();env.execute();} }3.4、process底层处理 process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身)process 用法比较灵活后面再做专门研究。 package com.lyh.flink05; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger streamSource env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperatorInteger processed streamSource.process(new ProcessFunctionInteger, Integer() {Overridepublic void processElement(Integer value, Context ctx, CollectorInteger out) throws Exception {if (value % 3 0) {//测流数据ctx.output(new OutputTagInteger(3%0, TypeInformation.of(Integer.class)), value);}if (value % 3 1) {//测流数据ctx.output(new OutputTagInteger(3%1, TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStreamInteger output0 processed.getSideOutput(new OutputTag(3%0,TypeInformation.of(Integer.class)));DataStreamInteger output1 processed.getSideOutput(new OutputTag(3%1,TypeInformation.of(Integer.class)));output1.print();env.execute();} }四、合流算子 4.1、connect连接 在某些情况下我们需要将两个不同来源的数据流进行连接实现数据匹配比如订单支付和第三方交易信息这两个信息的数据就来自于不同数据源连接后将订单支付和第三方交易信息进行对账此时才能算真正的支付完成。 Flink中的connect算子可以连接两个保持他们类型的数据流两个数据流被connect之后只是被放在了一个同一个流中内部依然保持各自的数据和形式不发生任何变化两个流相互独立。 package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.common.protocol.types.Field;public class connect_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger data1 env.fromElements(1, 2, 3, 4, 5);DataStreamSourceString data2 env.fromElements(a, b, c);ConnectedStreamsInteger, String data3 data1.connect(data2);data3.getFirstInput().print(data1);data3.getSecondInput().print(data2);env.execute();} }4.2、union合并 package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class union_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger data1 env.fromElements(1, 2, 3);DataStreamSourceInteger data2 env.fromElements(555,666);DataStreamSourceInteger data3 env.fromElements(999);DataStreamInteger data data1.union(data2).union(data3);data.print();env.execute();} }
http://www.hkea.cn/news/14297347/

相关文章:

  • 做塑料的网站名字东莞网站建设哪里找
  • 免费隐私网站推广app少儿编程课
  • 太平洋手机官方网站网站设计手机
  • 建设门户网站都需要什么进销存软件终身免费版
  • 成都市住房和城乡建设局网站黑龙江省垦区建设协会网站
  • 蚌埠建设网站学校 网站建设 招标
  • 网站 建设 成品centos wordpress 整站
  • 创建个人网站制作流程步骤青岛专业网站营销
  • 做企业云网站的企业邮箱深圳市宝安区松岗街道邮政编码
  • 网页设计与网站建设期末考试金坛市住房和城乡建设局网站
  • 银行网站 设计方案wordpress主题 Grace
  • 手机网站菜单网页怎么做网站单页面怎么做的
  • 花瓣网设计网站交互式网站定义
  • 怎么能自己做网站河南省建设工程网站
  • 响应式企业网站开发所用的平台加盟代理网
  • 域网站名分类收费网站空间
  • 网站开发企业需要什么资质个人怎么做百度竞价
  • 团队建设 深度好文分享的网站搜狗收录提交
  • 外贸做的社交网站有哪些友情链接交换网址大全
  • 关于室内设计的网站有哪些创建个人百度百科
  • 网站开发所需要的时间 知乎江西响应式网页建设
  • 高端建站网站手机网站域名解析
  • 南希网站建设货车保险哪家网站可以直接做
  • 网站title是什么公司网站百度搜不到
  • 济南设计网站的公司单位网站建设ppt
  • 互联网建站公司中建八局第一建设有限公司装饰
  • 一页网站京津冀协同发展存在的突出问题
  • 哪个网站做新中式wordpress主题无法更换
  • 做营销型网站用那个cms好网站模板之家官网
  • 东莞建站模板公司济南智能网站建设