广东省住房建设厅网站,小白node怎么做网站,如何创建网站详细步骤,越秀企业网站建设1、概述
目前Flink支持使用DataStream API 和SQL API方式实时读取和写入Iceberg表#xff0c;建议使用SQL API方式实时读取和写入Iceberg表。
Iceberg支持的Flink版本为1.11.x版本以上#xff0c;以下为版本匹配关系#xff1a;
Flink版本Iceberg版本备注Flink1.11.XI… 1、概述
目前Flink支持使用DataStream API 和SQL API方式实时读取和写入Iceberg表建议使用SQL API方式实时读取和写入Iceberg表。
Iceberg支持的Flink版本为1.11.x版本以上以下为版本匹配关系
Flink版本Iceberg版本备注Flink1.11.XIceberg0.11.1Flink1.12.x ~ Flink1.13.xIceberg0.12.1SQL API有BugFlink1.14.xIceberg0.12.1SQL API有Bug
本次学习以Flink和Iceberg整合使用Flink版本为1.14.5Iceberg版本为0.12.1版本。
2、DataStream API
2.1、实时写入Iceberg表
2.1.1、导入依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdflinkiceberg1/artifactIdversion1.0-SNAPSHOT/versionpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.target!-- flink 1.12.x -1.13.x 版本与Iceberg 0.12.1 版本兼容 不能与Flink 1.14 兼容--flink.version1.13.5/flink.version!--flink.version1.12.1/flink.version--!--flink.version1.14.2/flink.version--!-- flink 1.11.x 与Iceberg 0.11.1 合适--!--flink.version1.11.6/flink.version--hadoop.version3.1.1/hadoop.version/propertiesdependenciesdependencygroupIdcom.alibaba.ververica/groupIdartifactIdververica-connector-iceberg/artifactIdversion1.13-vvr-4.0.7/versionexclusionsexclusiongroupIdcom.google.guava/groupIdartifactIdguava-parent/artifactId/exclusion/exclusions/dependency!-- Flink 操作Iceberg 需要的Iceberg依赖 --dependencygroupIdorg.apache.iceberg/groupIdartifactIdiceberg-flink-runtime/artifactIdversion0.12.1/version!--version0.11.1/version--/dependency!-- java开发Flink所需依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.11/artifactIdversion${flink.version}/version/dependency!-- Flink Kafka连接器的依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion${flink.version}/version/dependency!-- 读取hdfs文件需要jar包--dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionexclusionsexclusionartifactIdguava/artifactIdgroupIdcom.google.guava/groupId/exclusion/exclusions/dependency!-- Flink SQL Table--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime-blink_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.11/artifactIdversion${flink.version}/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependency!-- log4j 和slf4j 包,如果在控制台不想看到日志可以将下面的包注释掉--dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/versionscopetest/scope/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-nop/artifactIdversion1.7.25/versionscopetest/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-simple/artifactIdversion1.7.5/version/dependency/dependencies/project2.1.2、创建Iceberg表
核心通过Flink创建Iceberg表
-- 1、创建catalogCREATE CATALOG hadoop_catalog WITH (typeiceberg,catalog-typehadoop,warehousehdfs://leidi01:8020/iceberg_catalog,property-version1);-- 2、创建databases
create database flink_iceberg;-- 3、创建Sink表
CREATE TABLE hadoop_catalog.flink_iceberg.icebergdemo1 (id STRING,data STRING
); 运行结果 2.1.3、代码实现
public class FlinkIcebergDemo1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后才会commit数据。env.enableCheckpointing(5000);//2.读取Kafka 中的topic 数据KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(192.168.6.102:6667).setTopics(json).setGroupId(my-group-id).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString kafkaSource env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source);//3.对数据进行处理包装成RowData 对象方便保存到Iceberg表中。SingleOutputStreamOperatorRowData dataStream kafkaSource.map(new MapFunctionString, RowData() {Overridepublic RowData map(String s) throws Exception {System.out.println(s s);String[] split s.split(,);GenericRowData row new GenericRowData(4);row.setField(0, Integer.valueOf(split[0]));row.setField(1, StringData.fromString(split[1]));row.setField(2, Integer.valueOf(split[2]));row.setField(3, StringData.fromString(split[3]));return row;}});//4.创建Hadoop配置、Catalog配置和表的Schema方便后续向路径写数据时可以找到对应的表Configuration hadoopConf new Configuration();Catalog catalog new HadoopCatalog(hadoopConf,hdfs://leidi01:8020/flinkiceberg/);//配置iceberg 库名和表名TableIdentifier name TableIdentifier.of(icebergdb, flink_iceberg_tbl);//创建Icebeng表SchemaSchema schema new Schema(Types.NestedField.required(1, id, Types.IntegerType.get()),Types.NestedField.required(2, nane, Types.StringType.get()),Types.NestedField.required(3, age, Types.IntegerType.get()),Types.NestedField.required(4, loc, Types.StringType.get()));//如果有分区指定对应分区这里“loc”列为分区列可以指定unpartitioned 方法不设置表分区
// PartitionSpec spec PartitionSpec.unpartitioned();PartitionSpec spec PartitionSpec.builderFor(schema).identity(loc).build();//指定Iceberg表数据格式化为Parquet存储MapString, String props ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());Table table null;// 通过catalog判断表是否存在不存在就创建存在就加载if (!catalog.tableExists(name)) {table catalog.createTable(name, schema, spec, props);}else {table catalog.loadTable(name);}TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://leidi01:8020/flinkiceberg//icebergdb/flink_iceberg_tbl, hadoopConf);//5.通过DataStream Api 向Iceberg中写入数据FlinkSink.forRowData(dataStream)//这个 .table 也可以不写指定tableLoader 对应的路径就可以。.table(table).tableLoader(tableLoader)//默认为false,追加数据。如果设置为true 就是覆盖数据.overwrite(false).build();env.execute(DataStream Api Write Data To Iceberg);}
}注意事项
1需要设置CheckpointFlink向Iceberg中写入Commit数据时只有Checkpoint成功之后才会Commit数据否则后期在Hive中查询不到数据。
2读取Kafka数据后需要包装成RowData或者Row对象才能向Iceberg表中写出数据。写出数据时默认是追加数据如果指定overwrite就是全部覆盖数据。
3在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。
4不建议使用DataStream API 向Iceberg中写数据建议使用SQL API。
2.1.4、Kafka消费者启动
bin/kafka-console-producer.sh --topic json --broker-list leidi01:6667bin/kafka-console-consumer.sh --bootstrap-server leidi01:6667 --topic json --from-beginning生产数据 运行结果data中有两个分区 2.1.5、查询表结果
说明在Flink SQL中创建Hadoop Catalog。
-- 1、创建Hadoop Catalog
CREATE CATALOG flinkiceberg WITH (typeiceberg,catalog-typehadoop,warehousehdfs://leidi01:8020/flinkiceberg/,property-version1
);-- 2、查询表中数据
use catalog flinkiceberg;
use icebergdb;
select * from flink_iceberg_tbl;运行结果 2.2、批量/实时读取Iceberg表
核心DataStream API 读取Iceberg表又分为批量读取和实时读取通过方法“streaming(true/false)”来控制。
2.2.1、批量读取
说明设置方法“streaming(false)”代码实现
public class FlinkIcebergRead {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//1.配置TableLoaderConfiguration hadoopConf new Configuration();TableLoader tableLoader TableLoader.fromHadoopTable(hdfs://leidi01:8020/flinkiceberg//icebergdb/flink_iceberg_tbl, hadoopConf);//2.从Iceberg中读取全量/增量读取数据DataStreamRowData batchData FlinkSource.forRowData().env(env).tableLoader(tableLoader)//默认为false,整批次读取设置为true 为流式读取.streaming(false).build();batchData.map(new MapFunctionRowData, String() {Overridepublic String map(RowData rowData) throws Exception {int id rowData.getInt(0);String name rowData.getString(1).toString();int age rowData.getInt(2);String loc rowData.getString(3).toString();return id,name,age,loc;}}).print();env.execute(DataStream Api Read Data From Iceberg);}
}运行结果 2.2.2、实时读取 说明设置方法“streaming(true)” 代码实现
DataStreamRowData batchData FlinkSource.forRowData().env(env).tableLoader(tableLoader)//默认为false,整批次读取设置为true 为流式读取.streaming(true).build();Flink SQL插入数据
insert into flink_iceberg_tbl values (5,s1,30,guangzhou),(6,s2,31,tianjin);运行结果 2.2.3、指定基于快照实时增量读取数据
核心设置方法StartSnapshotId(快照编号)
1查看快照编号 2代码实现
//2.从Iceberg中读取全量/增量读取数据
DataStreamRowData batchData FlinkSource.forRowData().env(env).tableLoader(tableLoader)//基于某个快照实时增量读取数据快照需要从元数据中获取.startSnapshotId(1738199999360637062L)//默认为false,整批次读取; 设置为true为流式读取.streaming(true).build();3运行结果
说明*只读取到指定快照往后的数据* 2.2.4、合并Data Flies
说明Iceberg提供Api通过定期提交任务将小文件合并成大文件可以通过Flink 批任务来执行。
1未处理文件
说明Iceberg每提交一次数据都会产生一个Data File。 2代码实现
public class RewrietDataFiles {public static void main(String[] args) {ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 1、配置TableLoaderConfiguration hadoopConf new Configuration();//2.创建Hadoop配置、Catalog配置和表的Schema方便后续向路径写数据时可以找到对应的表Catalog catalog new HadoopCatalog(hadoopConf,hdfs://leidi01:8020/flinkiceberg/);//3.配置iceberg 库名和表名并加载表TableIdentifier name TableIdentifier.of(icebergdb, flink_iceberg_tbl);Table table catalog.loadTable(name);//4..合并 data files 小文件RewriteDataFilesActionResult result Actions.forTable(table).rewriteDataFiles()//默认 512M 可以手动通过以下指定合并文件大小与Spark中一样。.targetSizeInBytes(536870912L).execute();}
}3运行结果 3、SQL API
3.1、创建表并插入数据
1代码实现
public class SQLAPIWriteIceberg {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv StreamTableEnvironment.create(env);env.enableCheckpointing(1000);//1.创建CatalogtblEnv.executeSql(CREATE CATALOG hadoop_iceberg WITH ( typeiceberg, catalog-typehadoop, warehousehdfs://leidi01:8020/flinkiceberg));//2.使用当前CatalogtblEnv.useCatalog(hadoop_iceberg);//3.创建数据库tblEnv.executeSql(create database iceberg_db);//4.使用数据库tblEnv.useDatabase(iceberg_db);//5.创建iceberg表 flink_iceberg_tbltblEnv.executeSql(create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc));//6.写入数据到表 flink_iceberg_tbltblEnv.executeSql(insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,zs,18,beijing),(2,ls,19,shanghai),(3,ww,20,guangzhou));}
}2运行结果
说明通过HDFS查看文件是否生成。 3查看数据
说明通过FlinkSQL查看表中数据
-- 1、创建CatalogCREATE CATALOG flinkiceberg WITH (typeiceberg,catalog-typehadoop,warehousehdfs://leidi01:8020/flinkiceberg/,property-version1);-- 2、查询数据
use catalog flinkiceberg
use iceberg_db;
select * from flink_iceberg_tbl2;查看结果 3.2、批量查询表数据
说明SQL API批量查询表中数据直接查询显示即可
1代码逻辑 2代码实现
public class SQLAPIReadIceberg {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv StreamTableEnvironment.create(env);env.enableCheckpointing(1000);//1.创建CatalogtblEnv.executeSql(CREATE CATALOG hadoop_iceberg WITH ( typeiceberg, catalog-typehadoop, warehousehdfs://leidi01:8020/flinkiceberg));
//2.批量读取表数据TableResult tableResult tblEnv.executeSql(select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 );tableResult.print();}
}运行结果 3.3、实时查询表数据
说明link SQL API 实时查询Iceberg表数据时需要设置参数**“table.dynamic-table-options.enabled”为true**以支持SQL语法中的“OPTIONS”选项
1代码逻辑 2代码实现
public class SQLStreamReadIceberg {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv StreamTableEnvironment.create(env);env.enableCheckpointing(1000);Configuration configuration tblEnv.getConfig().getConfiguration();// 支持SQL语法中的 OPTIONS 选项configuration.setBoolean(table.dynamic-table-options.enabled, true);//1.创建CatalogtblEnv.executeSql(CREATE CATALOG hadoop_iceberg WITH ( typeiceberg, catalog-typehadoop, warehousehdfs://leidi01:8020/flinkiceberg));//2.从Iceberg表当前快照读取所有数据并继续增量读取数据// streaming指定为true支持实时读取数据monitor_interval 监控数据的间隔默认1sTableResult tableResult tblEnv.executeSql(select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /* OPTIONS(streamingtrue, monitor-interval1s)*/);tableResult.print();}
}运行结果 3测试验证
FlinkSQL插入数据
insert into flink_iceberg_tbl2 values (5,s1,30,guangzhou),(6,s2,31,tianjin);运行结果在IDEA的控制台可以看到新增数据 3.4、基于快照实时增量读取数据
说明基于某个snapshot-id来继续实时获取数据
1代码逻辑 2代码实现
FlinkSQL插入数据
insert into flink_iceberg_tbl2 values (7,s11,30,beijing),(8,s22,31,beijing);snapshot-id如下 代码实现
public class SQLSnapshotReadIceberg {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tblEnv StreamTableEnvironment.create(env);env.enableCheckpointing(1000);Configuration configuration tblEnv.getConfig().getConfiguration();// 支持SQL语法中的 OPTIONS 选项configuration.setBoolean(table.dynamic-table-options.enabled, true);//1.创建CatalogtblEnv.executeSql(CREATE CATALOG hadoop_iceberg WITH ( typeiceberg, catalog-typehadoop, warehousehdfs://leidi01:8020/flinkiceberg));//2.从Iceberg 指定的快照继续实时读取数据快照ID从对应的元数据中获取//start-snapshot-id :快照IDTableResult tableResult2 tblEnv.executeSql(SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /* OPTIONS(streamingtrue, monitor-interval1s, start-snapshot-id8334669420406375204)*/);tableResult2.print();}
}
3运行结果 4、常见报错
4.1、window远程连接hadoop环境变量找不到
报错日志
HADOOP_HOME and hadoop.home.dir are unset.报错原因本地远程连接Hadoop系统时需要在本地配置相关的Hadoop变量主要包括hadoop.dll 与 winutils.exe 等。
winutils由于hadoop主要基于linux编写**winutil.exe主要用于模拟linux下的目录环境**。当Hadoop在windows下运行或调用远程Hadoop集群的时候需要该辅助程序才能运行。winutils是Windows中的二进制文件适用于不同版本的Hadoop系统并构建在Windows VM上该VM用以在Windows系统中测试Hadoop相关的应用程序。解决方案
1下载hadoop集群对应winutils版本
注意事项如果你安装的hadoop版本是3.1.2或者3.2.0 就用winutils-master里面的hadoop-3.0.0配置环境变量吧
https://github.com/steveloughran/winutils2将环境变量%HADOOP_HOME%设置为指向包含WINUTILS.EXE的BIN目录上方的目录 4.2、guava包版本冲突
报错日志
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V报错原因guava包版本冲突解决方案使用Maven Helper插件解决冲突
①第一步在pom界面点击Dependency Analyzer ②第二步查看Dependency Analyzer功能界面 Ⅰ、显示冲突的jar包
Ⅱ、以列表形式显示所有依赖
Ⅲ、以数的形式显示所有依赖
③第三步逐个解决conflicts列表中的jar包冲突问题以guava为例
点击guava找到右侧部分红色字体即依赖冲突的地方下图显示当前guava版本是24.0但是有两个依赖的guava版本分别是27.0.0.1和16.0.1。
④将低版本依赖都排除掉 选中红色字体显示的内容-右键-Exclude完成上述步骤结果如下 ⑤重新加载依赖配置 -------------------------------------------------------------------分割线-------------------------------------------------------------------------------
以上guava包冲突解决后依旧报错将Hadoop版本从3.2.2降低到3.1.1不报错。
注意hive-3.1.2依赖的Hadoop版本是3.1.0 [3]一般不建议runtime的Hadoop版本高于hive依赖的版本。
Ⅰ、解决方法一是在hive-exec里对guava做迁移这个需要自己手动给hive-exec重新打包。
Ⅱ、解决方法二是降低Hadoop版本这里不一定要降低集群的Hadoop版本而只是降低flink和hive这边用到的Hadoop版本相对于用老的Hadoop客户端去访问新的Hadoop服务器这个小版本的包容性一般来说是没有问题的。
hadoop.version3.2.2/hadoop.version
!--将hadoop版本由3.2.2版本降低为3.1.1--
hadoop.version3.1.1/hadoop.version4.4、log4j2配置文件报错
报错日志
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property org.apache.logging.log4j.simplelog.StatusLogger.level to TRACE to show Log4j2 internal initialization logging.报错原因没有发现log4j2配置文件解决方案添加配置log4j2.xml文件对应org.apache.logging.log4j.Logger
?xml version1.0 encodingUTF-8?
Configuration statusWARNPropertiesproperty namelog_level valueinfo /Property namelog_dir valuelog /property namelog_patternvalue[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%p] - [%t] %logger - %m%n /property namefile_name valuetest /property nameevery_file_size value100 MB //PropertiesAppendersConsole nameConsole targetSYSTEM_OUTPatternLayout pattern${log_pattern} //ConsoleRollingFile nameRollingFilefilename${log_dir}/${file_name}.logfilepattern${log_dir}/$${date:yyyy-MM}/${file_name}-%d{yyyy-MM-dd}-%i.logThresholdFilter levelDEBUG onMatchACCEPTonMismatchDENY /PatternLayout pattern${log_pattern} /PoliciesSizeBasedTriggeringPolicysize${every_file_size} /TimeBasedTriggeringPolicy modulatetrueinterval1 //PoliciesDefaultRolloverStrategy max20 //RollingFileRollingFile nameRollingFileErrfileName${log_dir}/${file_name}-warnerr.logfilePattern${log_dir}/$${date:yyyy-MM}/${file_name}-%d{yyyy-MM-dd}-warnerr-%i.logThresholdFilter levelWARN onMatchACCEPTonMismatchDENY /PatternLayout pattern${log_pattern} /PoliciesSizeBasedTriggeringPolicysize${every_file_size} /TimeBasedTriggeringPolicy modulatetrueinterval1 //Policies/RollingFile/AppendersLoggersRoot level${log_level}AppenderRef refConsole /AppenderRef refRollingFile /appender-ref refRollingFileErr //Root/Loggers
/Configuration4.5、Flink Hive Catalog报错
报错日志
Exception in thread main java.lang.NoSuchMethodError: org.apache.calcite.sql.parser.SqlParser.config()Lorg/apache/calcite/sql/parser/SqlParser$Config;报错原因依赖报错 解决方案将所有依赖切换到2.12切换flink-table-api-java-bridge到flink-table-api-scala-bridge_2.12。