平安建设 十户长网站地址,wordpress heart,新买的服务器怎么做网站,上海专业网站建设 公司目录 一、Flink简介
二、为什么选择Flink
三、与传统数据处理架构相比
四、Flinik批处理数据基础代码
五、Flink流处理基础代码 一、Flink简介
Apache Flink 是一个框架和分布式处理引擎#xff0c;用于对无界和有界数 据流进行状态计算。
二、为什么选择Flink
流数据更…目录 一、Flink简介
二、为什么选择Flink
三、与传统数据处理架构相比
四、Flinik批处理数据基础代码
五、Flink流处理基础代码 一、Flink简介
Apache Flink 是一个框架和分布式处理引擎用于对无界和有界数 据流进行状态计算。
二、为什么选择Flink
流数据更真实地反映了我们的生活方式
传统的数据架构是基于有限数据集的
低延迟 ➢ 高吞吐 ➢ 结果的准确性和良好的容错性
三、与传统数据处理架构相比
传统分析处理中将数据从业务数据库复制到数仓再进行分析和查询 而有状态的流式处理 四、Flinik批处理数据基础代码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;//批处理word count
public class WordCount {public static void main(String[] args) throws Exception{//创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();//从文件里读取数据String inputPath D:\\java\\Flink\\src\\main\\webapp\\resource\\hello.txt;DataSourceString inputDataSet env.readTextFile(inputPath);//对数据集进行处理,按空格分词展开转换成(word,1)二元组进行统计DataSetTuple2String,Integer resultSet inputDataSet.flatMap(new MyflatMapper()).groupBy(0) //按照第一个位置的word分组.sum(1); //将第二个位置上的数据求和resultSet.print();}//自定义类实现FlatMapFunction接口public static class MyflatMapper implements FlatMapFunctionString, Tuple2String,Integer{Overridepublic void flatMap(String s, CollectorTuple2String, Integer collector) throws Exception {//按空格分词String[] words s.split( );//遍历所有的word包成二元组输出for (String word: words){collector.collect(new Tuple2(word,1));}}}}五、Flink流处理基础代码
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;//流处理(数据边来边处理)
public class StreamWordCount {public static void main(String[] args) throws Exception{//创建流处理执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为8env.setParallelism(8);//从文件中读取数据
// String inputPath D:\\java\\Flink\\src\\main\\webapp\\resource\\hello.txt;
// DataStreamString inputDataStream env.readTextFile(inputPath);//从KAFKA中读取流数据(监听端口号,边输入边处理)//用parameter tool工具从程序启动参数中提取配置项ParameterTool parameterTool ParameterTool.fromArgs(args);String host parameterTool.get(host);int port parameterTool.getInt(port);DataStreamString inputDataStream env.socketTextStream(host,port);//基于数据流进行转换计算SingleOutputStreamOperatorTuple2String,Integer resultStream inputDataStream.flatMap( new WordCount.MyflatMapper()).keyBy(0).sum(1);resultStream.print();//执行任务env.execute();}
}