当前位置: 首页 > news >正文

网站建设江门 优荐触屏手机网站设计

网站建设江门 优荐,触屏手机网站设计,开发公司对外房屋销售优惠政策,宁波自主建站模板文章目录 系列文章索引一、快速上手1、导包2、求词频demo#xff08;1#xff09;要读取的数据#xff08;2#xff09;demo1#xff1a;批处理#xff08;离线处理#xff09;#xff08;3#xff09;demo2 - lambda优化#xff1a;批处理#xff08;离线处理… 文章目录 系列文章索引一、快速上手1、导包2、求词频demo1要读取的数据2demo1批处理离线处理3demo2 - lambda优化批处理离线处理4demo3流处理实时处理5总结实时vs离线6demo4批流一体7对接Socket 二、Flink部署1、Flink架构2、Standalone部署3、自运行flink-web4、通过参数传递5、通过webui提交job6、停止作业7、常用命令8、集群 参考资料 系列文章索引 Flink从入门到实践一Flink入门、Flink部署 Flink从入门到实践二Flink DataStream API Flink从入门到实践三数据实时采集 - Flink MySQL CDC 一、快速上手 1、导包 !-- fink 相关依赖 -- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion1.18.0/version /dependency2、求词频demo 注意自Flink 1.18以来所有Flink DataSet api都已弃用并将在未来的Flink主版本中删除。您仍然可以在DataSet中构建应用程序但是您应该转向DataStream和/或Table API。 1要读取的数据 定义data内容 pk,pk,pk ruoze,ruoze hello 2demo1批处理离线处理 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;/*** 使用Flink进行批处理并统计wc*** 结果* (bye,2)* (hello,3)* (hi,1)*/ public class BatchWordCountApp {public static void main(String[] args) throws Exception {// step0: Spark中有上下文Flink中也有上下文MR中也有ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// step1: 读取文件内容 一行一行的字符串而已DataSourceString source env.readTextFile(data/wc.data);// step2: 每一行的内容按照指定的分隔符进行拆分 1:Nsource.flatMap(new FlatMapFunctionString, String() {/**** param value 读取到的每一行数据* param out 输出的集合*/Overridepublic void flatMap(String value, CollectorString out) throws Exception {// 使用,进行分割String[] splits value.split(,);for(String split : splits) {out.collect(split.toLowerCase().trim());}}}).map(new MapFunctionString, Tuple2String,Integer() {/**** param value 每一个元素 (hello, 1)(hello, 1)(hello, 1)*/Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value, 1);}}).groupBy(0) // step4: 按照单词进行分组 groupBy是离线的api传下标.sum(1) // 求词频 sum传下标.print(); // 打印} } 3demo2 - lambda优化批处理离线处理 import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;/*** lambda表达式优化*/ public class BatchWordCountAppV2 {public static void main(String[] args) throws Exception {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();DataSourceString source env.readTextFile(data/wc.data);/*** lambda语法 (参数1参数2参数3...) - {函数体}*/ // source.map(String::toUpperCase).print();// 使用了Java泛型由于泛型擦除的原因需要显示的声明类型信息source.flatMap((String value, CollectorTuple2String,Integer out) - {String[] splits value.split(,);for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).groupBy(0).sum(1).print();} }4demo3流处理实时处理 import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;/*** 流式处理* 结果* 8 (hi,1)* 6 (hello,1)* 5 (bye,1)* 6 (hello,2)* 6 (hello,3)* 5 (bye,2)*/ public class StreamWCApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.readTextFile(data/wc.data);source.flatMap((String value, CollectorTuple2String,Integer out) - {String[] splits value.split(,);for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x - x.f0) // 这种写法一定要掌握流式的并没有groupBy而是keyBy根据第一个值进行sum.sum(1).print();// 需要手动开启env.execute(作业名字);} } 5总结实时vs离线 离线结果是一次性出来的。 实时来一个数据处理一次数据是带状态的。 6demo4批流一体 import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;/*** 采用批流一体的方式进行处理*/ public class FlinkWordCountApp {public static void main(String[] args) throws Exception {// 统一使用StreamExecutionEnvironment这个执行上下文环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 选择处理方式 批/流/自动DataStreamSourceString source env.readTextFile(data/wc.data);source.flatMap((String value, CollectorTuple2String,Integer out) - {String[] splits value.split(,);for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x - x.f0) // 这种写法一定要掌握.sum(1).print();// 执行env.execute(作业名字);} } 7对接Socket import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;/*** 使用Flink对接Socket的数据并进行词频统计** 大数据处理的三段论 输入 处理 输出**/ public class FlinkSocket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();/*** 数据源可以通过多种不同的数据源接入数据socket kafka text** 官网上描述的是 env.addSource(...)** socket的方式对应的并行度是1因为它来自于SourceFunction的实现*/DataStreamSourceString source env.socketTextStream(localhost, 9527);System.out.println(source.getParallelism());// 处理source.flatMap((String value, CollectorTuple2String,Integer out) - {String[] splits value.split(,);for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x - x.f0) // 这种写法一定要掌握.sum(1)// 数据输出.print(); // 输出到外部系统中去env.execute(作业名字);} } 二、Flink部署 1、Flink架构 https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/ Flink是一个分布式的带有状态管理的计算框架可以运行在常用/常见的集群资源管理器上YARN、K8S。 一个JobManager协调/分配一个或多个TaskManager工作。 2、Standalone部署 按照官网下载执行即可 https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/ 可以根据官网来安装需要下载、解压、安装。 也可以使用docker安装。 启动之后localhost:8081就可以访问管控台了。 3、自运行flink-web dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion1.18.0/version /dependencyConfiguration configuration new Configuration(); configuration.setInteger(rest.port, 8082); // 指定web端口开启webUI不写的话默认8081 StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); // 新版本可以直接使用getExecutionEnvironment(conf)以上亲测并不好使……具体原因未知设置为flink1.16版本或许就好用了。 4、通过参数传递 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 通过参数传递进来Flink引用程序所需要的参数flink自带的工具类 ParameterTool tool ParameterTool.fromArgs(args); String host tool.get(host); int port tool.getInt(port);DataStreamSourceString source env.socketTextStream(host, port); System.out.println(source.getParallelism());可以通过命令行参数–host localhost --port 8765 5、通过webui提交job 6、停止作业 7、常用命令 # 查看作业列表 flink list -a # 所有 flink list -r # 正在运行的 # 停止作业 flink cancel jobid# 提交job # -c,--class classname 指定main方法 # -C,--classpath url 指定classpath # -p,--parallelism paralle 指定并行度 flink run -c com.demo.FlinkDemo FlinkTest.jar 8、集群 https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/#flink-application-execution https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/ 单机部署Session Mode和Application Mode https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/ k8s https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/native_kubernetes/ YARN https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/ 参考资料 https://flink.apache.org/ https://nightlies.apache.org/flink/flink-docs-stable/
http://www.hkea.cn/news/14513924/

相关文章:

  • 13个实用平面设计网站网站制作商家入驻
  • 天津常规网站建设系列discuz网站标题
  • 怎样建立一个简单的网站百度惠生活商家入驻
  • 公司做网站的费用记什么科目网站建设怎么搭建服务器
  • 义乌论坛网站建设中铁建设
  • 网站建设国家标准会网站开发没学历
  • 上海找做网站公司哪家好网站子站怎么做、
  • 网站的建设教程电商网站后台建设
  • 大连网站建设连城传媒wordpress 文章id连续
  • dedecms仿站教程wordpress _e
  • 网站上传后怎么访问自己做网站的软件
  • 汕头企业网站wordpress淘宝联盟模板
  • 网站开发费用摊销年限网站开发模块的需求分析
  • 一个企业网站ppt怎么做网站添加站长统计代码
  • 伊川网站开发移动网站开发 王府井
  • 海南省住建设厅网站报监的工程cms系统做漫画网站
  • 做网站销售怎么做wordpress国外主题网站
  • 哪个公司网站备案快互联网营销师报名入口官网
  • 建站宝盒创业经历网站推广有什么方法
  • 优才网站建设网站宣传创意视频
  • 中恒诚信建设有限公司网站昌图网站
  • 网站临时会话做网站iiwok
  • 拼团网站建设湖北网站建设公司排名
  • 俄罗斯门户网站之路网站建设
  • 建个好网站网站建设常州
  • 咸宁网站建设解决方案1t网站空间主机多少钱
  • ps做图哪个网站好网站建设 排名下拉
  • 便宜网站建设怎么样网站策划制作公司 北京
  • 在小型网站建设小组中答案知乎 做网站的公司 中企动力
  • 汽车网站名称dedecms做地方网站