当前位置: 首页 > news >正文

淘宝网站青岛网站建设方案服务

淘宝网站,青岛网站建设方案服务,温州十大网络公司排名,佛山推广系统Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 33、Flink之hive介绍与简单示例 41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 文章目录 Flink 系列文章一、Hive 方言介绍1、使用 Hive 方言1、SQL 客户端2、Table API 2、DDL1、show2、catalog2、database3、table4、VIEW5、FUNCTION 3、DML DQL4、注意 本文介绍了flink sql中使用hive方言的具体示例每个示例都是经过运行的有些示例为了体现出关键命令的作用保留了出错信息。 本文依赖环境是hadoop、zookeeper、hive、flink环境好用具体示例是在1.13版本中运行的因为hadoop集群环境是基于jdk8的flink1.17版本需要jdk11。 更多的内容详见后续关于hive的介绍。如果你的环境不能运行本文示例请参考42、Flink 的table api与sql之Hive Catalog 一、Hive 方言介绍 从 1.11.0 开始在使用 Hive 方言时Flink 允许用户用 Hive 语法来编写 SQL 语句。通过提供与 Hive 语法的兼容性我们旨在改善与 Hive 的互操作性并减少用户需要在 Flink 和 Hive 之间切换来执行不同语句的情况。 1、使用 Hive 方言 Flink 目前支持两种 SQL 方言: default 和 hive。你需要先切换到 Hive 方言然后才能使用 Hive 语法编写。下面介绍如何使用 SQL 客户端和 Table API 设置方言。 还要注意你可以为执行的每个语句动态切换方言。无需重新启动会话即可使用其他方言。 1、SQL 客户端 SQL 方言可以通过 table.sql-dialect 属性指定。因此你可以通过 SQL 客户端 yaml 文件中的 configuration 部分来设置初始方言。 execution:planner: blinktype: batchresult-mode: tableconfiguration:table.sql-dialect: hive或者在 SQL 客户端启动后设置方言。 ----使用hive方言 Flink SQL set table.sql-dialecthive; [INFO] Session property has been set.---使用默认的方言 Flink SQL set table.sql-dialectdefault; Hive Session ID 90f6200f-2af7-4045-93fc-9a1fbe77fcfd [INFO] Session property has been set. 2、Table API 你可以使用 Table API 为 TableEnvironment 设置方言。 EnvironmentSettings settings EnvironmentSettings.newInstance().useBlinkPlanner()...build(); TableEnvironment tableEnv TableEnvironment.create(settings); // to use hive dialect tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // to use default dialect tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); 示例 import java.util.HashMap; import java.util.Map;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row;/*** author alanchan**/ public class TestCreateHiveTable {public static final String tableName alan_hivecatalog_hivedb_testTable;public static final String hive_create_table_sql CREATE TABLE tableName (\n id INT,\n name STRING,\n age INT ) TBLPROPERTIES (\n sink.partition-commit.delay5 s,\n sink.partition-commit.triggerpartition-time,\n sink.partition-commit.policy.kindmetastore,success-file );/*** param args* throws DatabaseAlreadyExistException* throws CatalogException*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);String hiveConfDir /usr/local/bigdata/apache-hive-3.1.2-bin/conf;String name alan_hive;// default 数据库名称String defaultDatabase default;HiveCatalog hiveCatalog new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog(alan_hive, hiveCatalog);tenv.useCatalog(alan_hive);String newDatabaseName alan_hivecatalog_hivedb;tenv.useDatabase(newDatabaseName);// 创建表tenv.getConfig().setSqlDialect(SqlDialect.HIVE);tenv.executeSql(hive_create_table_sql);// 插入数据String insertSQL insert into alan_hivecatalog_hivedb_testTable values (1,alan,18);tenv.executeSql(insertSQL);// 查询数据String selectSQL select * from alan_hivecatalog_hivedb_testTable ;Table table tenv.sqlQuery(selectSQL);table.printSchema();DataStreamTuple2Boolean, Row result tenv.toRetractStream(table, Row.class);result.print();env.execute();}}2、DDL 本章节列出了 Hive 方言支持的 DDL 语句。我们主要关注语法。具体的hive语法可以参考hive的相关文档。比如本人关于hive的专栏 3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表 1、show SHOW CATALOGS SHOW CURRENT CATALOG SHOW DATABASES SHOW CURRENT DATABASE SHOW TABLES SHOW VIEWS SHOW FUNCTIONS SHOW MODULES SHOW FULL MODULES2、catalog 关于hivecatalog的操作详见下文 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 42、Flink 的table api与sql之Hive Catalog ----创建 CREATE CATALOG alan_hivecatalog WITH (type hive,default-database testhive,hive-conf-dir /usr/local/bigdata/apache-hive-3.1.2-bin/conf ); ---使用 use alan_hivecatalog ; 2、database ----创建 CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name[COMMENT database_comment][LOCATION fs_path][WITH DBPROPERTIES (property_nameproperty_value, ...)]; --------修改 ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_nameproperty_value, ...); ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role; ALTER (DATABASE|SCHEMA) database_name SET LOCATION fs_path; ----删除 DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE]; ------使用 USE database_name;示例 ----------------------sql语句 CREATE DATABASE IF NOT EXISTS alan_testdatabase comment This is a database comment with dbproperties (createdByalan); ---hive执行flink sql不支持语法 DESCRIBE DATABASE EXTENDED alan_testdatabase;--更改数据库属性 ALTER DATABASE alan_testdatabase SET DBPROPERTIES (createdByalanchan,createddate2023-08-31); --更改数据库所有者 ALTER DATABASE alan_testdatabase SET OWNER USER alanchan; --更改数据库位置 ALTER (DATABASE|SCHEMA) database_name SET LOCATION hdfs_path; ------------------创建数据库 Flink SQL use catalog alan_hivecatalog; Hive Session ID 094c27ff-fb61-485c-b4ab-3119881224c9 [INFO] Execute statement succeed.Flink SQL CREATE DATABASE IF NOT EXISTS alan_testdatabasecomment This is a database commentwith dbproperties (createdByalan); Hive Session ID e406eed8-6575-4c0e-8b1b-b99db288a018 [INFO] Execute statement succeed.Flink SQL show databases; Hive Session ID f37fa06e-3a98-4c63-a304-d431d03c6bfa ------------------------- | database name | ------------------------- | alan_hivecatalog_hivedb | | alan_testdatabase | | default | | test | | testhive | ------------------------- 5 rows in setFlink SQL DESCRIBE DATABASE EXTENDED alan_testdatabase; Hive Session ID 94218191-bcce-4722-8a05-2f11e6d6807b [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse statement: DESCRIBE DATABASE EXTENDED alan_testdatabase ----------------修改数据库属性 Flink SQL ALTER DATABASE alan_testdatabase SET DBPROPERTIES (createdByalanchan,createddate2023-08-31); Hive Session ID 6a598d85-8468-4f66-969f-8e4e1fa1981d [INFO] Execute statement succeed.Flink SQL ALTER DATABASE alan_testdatabase SET OWNER USER alanchan; Hive Session ID e9ce6a7a-6d89-48f8-81c9-5a303706c0f9 [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseException: line 1:48 cannot recognize input near alanchan EOF EOF in identifier for principal specFlink SQL ALTER DATABASE alan_testdatabase SET OWNER USER alanchan; Hive Session ID 7975a16d-cd2b-4be5-9a1e-90b1454860ed [INFO] Execute statement succeed.------------------------hive中查询数据库属性 0: jdbc:hive2://server4:10000 DESCRIBE DATABASE EXTENDED alan_testdatabase; ----------------------------------------------------------------------------------------------------------------- | db_name | comment | location | owner_name | owner_type | parameters | ----------------------------------------------------------------------------------------------------------------- | alan_testdatabase | This is a database comment | location/in/test | | USER | {createdByalan} | ----------------------------------------------------------------------------------------------------------------- 1 row selected (0.048 seconds) 0: jdbc:hive2://server4:10000 DESCRIBE DATABASE EXTENDED alan_testdatabase; --------------------------------------------------------------------------------------------------------------------------------------------- | db_name | comment | location | owner_name | owner_type | parameters | --------------------------------------------------------------------------------------------------------------------------------------------- | alan_testdatabase | This is a database comment | location/in/test | | USER | {createdByalanchan, createddate2023-08-31} | --------------------------------------------------------------------------------------------------------------------------------------------- 1 row selected (0.049 seconds) 0: jdbc:hive2://server4:10000 DESCRIBE DATABASE EXTENDED alan_testdatabase; --------------------------------------------------------------------------------------------------------------------------------------------- | db_name | comment | location | owner_name | owner_type | parameters | --------------------------------------------------------------------------------------------------------------------------------------------- | alan_testdatabase | This is a database comment | location/in/test | alanchan | USER | {createdByalanchan, createddate2023-08-31} | --------------------------------------------------------------------------------------------------------------------------------------------- 1 row selected (0.047 seconds) ---------flink drop 数据库 Flink SQL drop database alan_testdatabase; Hive Session ID 30c71d25-a8fa-4b84-99b9-b6441937b6cf [INFO] Execute statement succeed. 3、table ------创建 CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name[(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])][COMMENT table_comment][PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)][[ROW FORMAT row_format][STORED AS file_format]][LOCATION fs_path][TBLPROPERTIES (property_nameproperty_value, ...)]row_format:: DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char][MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char][NULL DEFINED AS char]| SERDE serde_name [WITH SERDEPROPERTIES (property_nameproperty_value, ...)]file_format:: SEQUENCEFILE| TEXTFILE| RCFILE| ORC| PARQUET| AVRO| INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classnamecolumn_constraint:: NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]table_constraint:: [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]--------修改 ALTER TABLE table_name RENAME TO new_table_name; ALTER TABLE table_name SET TBLPROPERTIES (property_name property_value, property_name property_value, ... ); ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION fs_path; --如果指定了 partition_spec那么必须完整即具有所有分区列的值。如果指定了该操作将作用在对应分区上而不是表上。 ALTER TABLE table_name [PARTITION partition_spec] SET FILEFORMAT file_format; --如果指定了 partition_spec那么必须完整即具有所有分区列的值。如果指定了该操作将作用在对应分区上而不是表上。Update SerDe Properties # ALTER TABLE table_name [PARTITION partition_spec] SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties];ALTER TABLE table_name [PARTITION partition_spec] SET SERDEPROPERTIES serde_properties;serde_properties:: (property_name property_value, property_name property_value, ... ) --如果指定了 partition_spec那么必须完整即具有所有分区列的值。如果指定了该操作将作用在对应分区上而不是表上。Add Partitions # ALTER TABLE table_name ADD [IF NOT EXISTS] (PARTITION partition_spec [LOCATION fs_path]); Drop Partitions # ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec[, PARTITION partition_spec, ...]; Add/Replace Columns # ALTER TABLE table_nameADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)[CASCADE|RESTRICT] Change Column # ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type[COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT]; Drop # DROP TABLE [IF EXISTS] table_name;--------删除drop table tableName; 基本表操作示例 Flink SQL create table student(num int,name string,sex string,age int,dept string)row format delimitedfields terminated by ,; Hive Session ID d3a87e7c-4175-4c07-957a-4e855a537654 [INFO] Execute statement succeed.Flink SQL show tables; Hive Session ID 820371a4-d23f-4660-a558-905bd6c578e1 ------------ | table name | ------------ | student | ------------ 1 row in setFlink SQL create external table student_ext(num int,name string,sex string,age int,dept string)row format delimitedfields terminated by ,; Hive Session ID 57656797-60f0-420e-8a0e-873ef5356303 [INFO] Execute statement succeed.Flink SQL show tables; Hive Session ID 060b1ab4-58a5-4377-b86c-4941e2dd672e ------------- | table name | ------------- | student | | student_ext | ------------- 2 rows in setFlink SQL desc student; Hive Session ID 25346927-8a02-4178-ae17-0444d7c2ded4 -------------------------------------------- | name | type | null | key | extras | watermark | -------------------------------------------- | num | INT | true | | | | | name | STRING | true | | | | | sex | STRING | true | | | | | age | INT | true | | | | | dept | STRING | true | | | | -------------------------------------------- 5 rows in setFlink SQL desc student_ext; Hive Session ID 4858109e-8e2c-49e0-9319-37dc347de73a -------------------------------------------- | name | type | null | key | extras | watermark | -------------------------------------------- | num | INT | true | | | | | name | STRING | true | | | | | sex | STRING | true | | | | | age | INT | true | | | | | dept | STRING | true | | | | -------------------------------------------- 5 rows in set Flink SQL show tables; Hive Session ID a6bc1062-de92-4aa3-abda-587124663002 ------------- | table name | ------------- | student | | student1 | | student_ext | ------------- 3 rows in set-----------删除表 Flink SQL drop table student; Hive Session ID e270ce9f-3bd1-4241-9ad4-a8e42405645b [INFO] Execute statement succeed.-----------修改表 --1、更改表名 ALTER TABLE student RENAME TO alan_student; Flink SQL ALTER TABLE student RENAME TO alan_student; Hive Session ID f0659cc5-e578-403e-8a9e-0989a64652dc [INFO] Execute statement succeed.Flink SQL show tables; Hive Session ID 6a1fdf70-43d5-40bd-86f7-4430b2d30e83 -------------- | table name | -------------- | alan_student | | student_ext | -------------- 2 rows in set--2、更改表属性 ALTER TABLE table_name SET TBLPROPERTIES (property_name property_value, ... ); --更改表注释 ALTER TABLE alan_student SET TBLPROPERTIES (comment new comment for alan_student table); Flink SQL ALTER TABLE alan_student SET TBLPROPERTIES (comment new comment for alan_student table); Hive Session ID c3a1f8ee-11fd-4e33-94bd-e941e2928c9d [INFO] Execute statement succeed. 0: jdbc:hive2://server4:10000 DESC FORMATTED alan_student; ------------------------------------------------------------------------------------------------------------------------ | col_name | data_type | comment | ------------------------------------------------------------------------------------------------------------------------ | # col_name | data_type | comment | | num | int | | | name | string | | | sex | string | | | age | int | | | dept | string | | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | alan_testdatabase | NULL | | OwnerType: | USER | NULL | | Owner: | null | NULL | | CreateTime: | Thu Aug 31 14:02:55 CST 2023 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | hdfs://HadoopHAcluster/user/hive/warehouse/alan_testdatabase.db/alan_student | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | bucketing_version | 2 | | | comment | new comment for alan_student table | | | numFiles | 0 | | | totalSize | 0 | | | transient_lastDdlTime | 1693461775 | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL | | InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL | | OutputFormat: | org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | -1 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | | Storage Desc Params: | NULL | NULL | | | field.delim | , | | | serialization.format | , | ------------------------------------------------------------------------------------------------------------------------ 34 rows selected (0.069 seconds)--3、更改列名称/类型/位置/注释 CREATE TABLE test_change (a int, b int, c int); Flink SQL CREATE TABLE test_change (a int, b int, c int);// 更改列名称 ALTER TABLE test_change CHANGE a a1 INT; Flink SQL ALTER TABLE test_change CHANGE a a1 INT; Flink SQL desc test_change; ------------------------------------------ | name | type | null | key | extras | watermark | ------------------------------------------ | a1 | INT | true | | | | | b | INT | true | | | | | c | INT | true | | | | ------------------------------------------// 更改列名称和类型 ALTER TABLE test_change CHANGE a1 a2 STRING AFTER b; Flink SQL ALTER TABLE test_change CHANGE a1 a2 STRING AFTER b; Flink SQL desc test_change; -------------------------------------------- | name | type | null | key | extras | watermark | -------------------------------------------- | b | INT | true | | | | | a2 | STRING | true | | | | | c | INT | true | | | | -------------------------------------------- --4、添加/替换列 --使用ADD COLUMNS可以将新列添加到现有列的末尾但在分区列之前。 --REPLACE COLUMNS 将删除所有现有列并添加新的列集。 ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type,...); Flink SQL desc alan_student; -------------------------------------------- | name | type | null | key | extras | watermark | -------------------------------------------- | num | INT | true | | | | | name | STRING | true | | | | | sex | STRING | true | | | | | age | INT | true | | | | | dept | STRING | true | | | | -------------------------------------------- Flink SQL ALTER TABLE alan_student ADD COLUMNS (balance int); Flink SQL desc alan_student; ----------------------------------------------- | name | type | null | key | extras | watermark | ----------------------------------------------- | num | INT | true | | | | | name | STRING | true | | | | | sex | STRING | true | | | | | age | INT | true | | | | | dept | STRING | true | | | | | balance | INT | true | | | | -----------------------------------------------Flink SQL ALTER TABLE alan_student REPLACE COLUMNS (age int); Flink SQL desc alan_student; ------------------------------------------ | name | type | null | key | extras | watermark | ------------------------------------------ | age | INT | true | | | | ------------------------------------------ 分区表操作示例 --1、增加分区 创建一个单分区表 create table user_dept (num int,name string,sex string,age int) partitioned by (dept string) row format delimited fields terminated by ,; --加载数据 load data inpath /hivetest/partition/students_MA.txt into table user_dept partition(dept MA);-- 一次添加一个分区 ALTER TABLE user_dept ADD PARTITION (deptIS) /user/hive/warehouse/testhive.db/user_dept/deptIS; --加载数据 load data inpath /hivetest/partition/students_IS.txt into table user_dept partition(dept IS);-- 添加多级分区 ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec [LOCATION location][, PARTITION partition_spec [LOCATION location], ...]; partition_spec:: (partition_column partition_col_value, partition_column partition_col_value, ...)--创建一个二级分区表 create table user_dept_sex (id int, name string,age int) partitioned by (dept string, sex string) row format delimited fields terminated by ,; --增加多分区 ALTER TABLE user_dept_sex ADD PARTITION (deptMA, sexM) PARTITION (deptMA, sexF) PARTITION (deptIS, sexM) PARTITION (deptIS, sexF) ; --加载数据 load data inpath /hivetest/partition/user_dept/ma/m into table user_dept_sex partition(deptMA, sexM); load data inpath /hivetest/partition/user_dept/ma/f into table user_dept_sex partition(deptMA, sexF); load data inpath /hivetest/partition/user_dept/is/m into table user_dept_sex partition(deptIS, sexM); load data inpath /hivetest/partition/user_dept/is/f into table user_dept_sex partition(deptIS, sexF);--2、rename partition --2、重命名分区 ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;ALTER TABLE user_dept_sex PARTITION (deptMA, sexM) RENAME TO PARTITION (deptMA, sexMale); ALTER TABLE user_dept_sex PARTITION (deptMA, sexF) RENAME TO PARTITION (deptMA, sexFemale); ALTER TABLE user_dept_sex PARTITION (deptIS, sexM) RENAME TO PARTITION (deptIS, sexMale); ALTER TABLE user_dept_sex PARTITION (deptIS, sexF) RENAME TO PARTITION (deptIS, sexFemale);--3、删除分区 delete partition --删除表的分区。这将删除该分区的数据和元数据。 ALTER TABLE table_name DROP [IF EXISTS] PARTITION (dt2008-08-08, countryus); ALTER TABLE table_name DROP [IF EXISTS] PARTITION (dt2008-08-08, countryus) PURGE; --直接删除数据 不进垃圾桶--5、修改分区 alter partition --更改分区文件存储格式 ALTER TABLE table_name PARTITION (dt2008-08-09) SET FILEFORMAT file_format; --更改分区位置 ALTER TABLE table_name PARTITION (dt2008-08-09) SET LOCATION new location;4、VIEW --创建 CREATE VIEW [IF NOT EXISTS] view_name [(column_name, ...) ][COMMENT view_comment][TBLPROPERTIES (property_name property_value, ...)]AS SELECT ...; --注意: 变更视图只在 Table API 中有效SQL 客户端不支持。 --修改 ALTER VIEW view_name RENAME TO new_view_name; ALTER VIEW view_name SET TBLPROPERTIES (property_name property_value, ... ); ALTER VIEW view_name AS select_statement; --删除 DROP VIEW [IF EXISTS] view_name;示例 --hive中有一张真实的基础表dim_user select * from test.dim_user;--1、创建视图 create view dim_user_v as select * from dim_user limit 4;--从已有的视图中创建视图 create view dim_user_from_v as select * from dim_user_v limit 2;--2、显示当前已有的视图 show tables; show views;--hive v2.2.0之后支持--3、视图的查询使用 select * from dim_user_v ; select * from dim_user_from_v ;--4、查看视图定义 show create table dim_user_v ;--5、删除视图 drop view dim_user_from_v;--6、更改视图属性 alter view dim_user_v set TBLPROPERTIES (comment This is a view);--7、更改视图定义 alter view dim_user_v as select name from dim_user limit 2; -----------------------flink sql cli 操作记录 Flink SQL select * from dim_user;-------------------------------------------------------------------- | op | id | name | -------------------------------------------------------------------- | I | 4 | 赵六 | | I | 5 | alan | | I | 1 | 张三 | | I | 2 | 李四 | | I | 3 | 王五 | -------------------------------------------------------------------- Received a total of 5 rowsFlink SQL create view dim_user_v as select * from dim_user limit 4;Flink SQL create view dim_user_from_v as select * from dim_user_v limit 2;Flink SQL show views; Hive Session ID ebae650c-ae23-4262-b9da-8ccef16d1b91 ----------------- | view name | ----------------- | dim_user_from_v | | dim_user_v | ----------------- 2 rows in setFlink SQL select * from dim_user_v ;-------------------------------------------------------------------- | op | id | name | -------------------------------------------------------------------- | I | 1 | 张三 | | I | 2 | 李四 | | I | 3 | 王五 | | I | 4 | 赵六 | -------------------------------------------------------------------- Received a total of 4 rowsFlink SQL show create table dim_user_v ; Hive Session ID 62802230-2420-484c-9a41-a66d45be3b3c [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse statement: show create table dim_user_vFlink SQL select * from dim_user_from_v ; -------------------------------------------------------------------- | op | id | name | -------------------------------------------------------------------- | I | 1 | 张三 | | I | 2 | 李四 | -------------------------------------------------------------------- Received a total of 2 rowsFlink SQL drop view dim_user_from_v;Flink SQL alter view dim_user_v set TBLPROPERTIES (comment This is a view);Flink SQL alter view dim_user_v as select name from dim_user limit 2;Flink SQL show views; ------------ | view name | ------------ | dim_user_v | ------------ 1 row in setFlink SQL select * from dim_user_v; ------------------------------------ | op | name | ------------------------------------ | I | 张三 | | I | 李四 | ------------------------------------ Received a total of 2 rows 5、FUNCTION --创建 CREATE FUNCTION function_name AS class_name; --删除 DROP FUNCTION [IF EXISTS] function_name;示例 更多的示例参考hive专栏的部分7、hive shell客户端与属性配置、内置运算符、函数内置运算符与自定义UDF运算符 Flink SQL select current_date(); ---------------- | op | _o__c0 | ---------------- | I | 2023-08-31 | ---------------- Received a total of 1 rowFlink SQL select floor(3.1415926); ---------------- | op | _o__c0 | ---------------- | I | 3 | ---------------- Received a total of 1 row 3、DML DQL Hive 方言支持常用的 Hive DML 和 DQL 。 下表列出了一些 Hive 方言支持的语法。 SORT/CLUSTER/DISTRIBUTE BY Group By Join Union LATERAL VIEW Window Functions SubQueries CTE INSERT INTO dest schema Implicit type conversions 为了实现更好的语法和语义的兼容强烈建议使用 HiveModule 并将其放在 Module 列表的首位以便在函数解析时优先使用 Hive 内置函数。 Hive 方言不再支持 Flink SQL 语法 。 若需使用 Flink 语法请切换到 default 方言。 以下是一个使用 Hive 方言的示例。 注意hive以流模式运行时不能insert overwrite插入数据 CREATE CATALOG alan_hivecatalog WITH (type hive,default-database testhive,hive-conf-dir /usr/local/bigdata/apache-hive-3.1.2-bin/conf ); use catalog alan_hivecatalog; set table.sql-dialecthive; load module hive; use modules hive,core; select explode(array(1,2,3)); create table tbl (key int,value string); set execution.runtime-modestreaming; insert into table tbl values (5,e),(1,a),(1,a),(3,c),(2,b),(3,c),(3,c),(4,d); select * from tbl;--------------------flink sql 操作 Flink SQL select explode(array(1,2,3)); Hive Session ID 7d3ae2d5-24f3-4d97-9897-83c8a9abda9b [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hive.ql.parse.SemanticException: Invalid function explodeFlink SQL set table.sql-dialecthive;Flink SQL select explode(array(1,2,3)); Hive Session ID c0b87333-4957-4c18-b197-27649a3f2ae2 [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hive.ql.parse.SemanticException: Invalid function explodeFlink SQL load module hive;Flink SQL use modules hive,core;Flink SQL select explode(array(1,2,3));----------------- | op | col | ----------------- | I | 1 | | I | 2 | | I | 3 | ----------------- Received a total of 3 rowsFlink SQL create table tbl (key int,value string);Flink SQL insert overwrite table tbl values (5,e),(1,a),(1,a),(3,c),(2,b),(3,c),(3,c),(4,d); Hive Session ID 12fe08fa-5e63-44b2-8fc3-a90064959451 [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Streaming mode not support overwrite.Flink SQL set execution.runtime-modebatch; Hive Session ID 4f17cc70-165c-4540-a299-874b66458521 [INFO] Session property has been set.Flink SQL insert overwrite table tbl values (5,e),(1,a),(1,a),(3,c),(2,b),(3,c),(3,c),(4,d); Hive Session ID 1923623f-03d3-44b4-93ab-ee8498c5da06 [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: java.lang.IllegalArgumentException: Checkpoint is not supported for batch jobs.Flink SQL set execution.runtime-modestreaming; Flink SQL insert into table tbl values (5,e),(1,a),(1,a),(3,c),(2,b),(3,c),(3,c),(4,d);Flink SQL select * from tbl; ------------------------------------------------- | op | key | value | ------------------------------------------------- | I | 5 | e | | I | 1 | a | | I | 1 | a | | I | 3 | c | | I | 2 | b | | I | 3 | c | | I | 3 | c | | I | 4 | d | ------------------------------------------------- Received a total of 8 rows 4、注意 以下是使用 Hive 方言的一些注意事项。 Hive 方言只能用于操作 Hive 对象并要求当前 Catalog 是一个 HiveCatalog 。Hive 方言只支持 db.table 这种两级的标识符不支持带有 Catalog 名字的标识符。虽然所有 Hive 版本支持相同的语法但是一些特定的功能是否可用仍取决于你使用的Hive 版本。例如更新数据库位置 只在 Hive-2.4.0 或更高版本支持。执行 DML 和 DQL 时应该使用 HiveModule 。 以上介绍了flink sql中使用hive方言的具体。
http://www.hkea.cn/news/14442607/

相关文章:

  • 关于外贸公司的网站模板html网页超链接代码
  • 深圳做电商平台网站建设风控网站开发
  • 深圳做微信商城网站建设怎样做网站关键词
  • 长沙商城网站建设微友说是做网站维护让帮忙投注
  • 各大搜索引擎提交网站入口大全wordpress设置目录
  • 九台区建设银行网站wordpress 中英文
  • 苏州比较大的网站公司国内重要新闻
  • 做动图为所欲为的网站记事本做的网站链接怎么装饰
  • 建设校园网站的背景及意义网站建设对企业经营
  • 电子商务网站推广案例荣茂网站建设
  • 提供坪山网站建设沈阳工程信息交易网
  • 做网站最简单的工具网页项目策划书模板
  • 商务网站建设的可行性分析包括成品短视频app大全
  • 北京知名的网站建设公司公司网站建设优点
  • 网站建设的7种流程图中国服务器市场
  • 教育网站开发深圳做网站费用
  • 网站制作的基本概念如何快速新建一个网站
  • 怎么查网站接入商可以做淘宝客的网站有哪些
  • 杭州做网站比较出名的公司关于建立企业网站的方案内容
  • 甘肃省建设厅官网站wordpress 改邮箱
  • 网站管理规范西安到北京火车票多少钱
  • 苏州学习网站建设怎么学习制作网站
  • 同个主体新增网站备案专门教做甜品的网站
  • 重庆公司建设网站hao123网址之家官网
  • 来宾城乡建设局网站网站个人微信收款方案
  • 哈尔滨模板建站公司推荐正规的网站制作在哪里
  • 网站建设要知道的标小智logo设计官网
  • wordpress自适应站点印花图案设计网站
  • 代理网站哪个好渝叶购零售客户电商网站
  • 怎么用网站卖自己做ftp备份wordpress