当前位置: 首页 > news >正文

广州白云机场网站建设零基础培训网页设计

广州白云机场网站建设,零基础培训网页设计,高端企业网站建设蓦然郑州网站建设6,公司网站的建设要注意什么目录 1、文件系统 SQL 连接器 2、如何指定文件系统类型 3、如何指定文件格式 4、读取文件系统 4.1 开启 目录监控 4.2 可用的 Metadata 5、写出文件系统 5.1 创建分区表 5.2 滚动策略、文件合并、分区提交 5.3 指定 Sink Parallelism 6、示例_通过FlinkSQL读取kafk…目录 1、文件系统 SQL 连接器 2、如何指定文件系统类型 3、如何指定文件格式 4、读取文件系统 4.1 开启 目录监控  4.2 可用的 Metadata 5、写出文件系统 5.1 创建分区表 5.2 滚动策略、文件合并、分区提交 5.3 指定 Sink Parallelism 6、示例_通过FlinkSQL读取kafka在写入hive表 6.1、创建 kafka source表用于读取kafka 6.2、创建 hdfs sink表用于写出到hdfs 6.3、insert into 写入到 hdfs_sink_table 6.4、查询 hdfs_sink_table 6.5、创建hive表指定local 1、文件系统 SQL 连接器 文件系统连接器允许从本地或分布式文件系统进行读写数据 官网链接文件系统 SQL 连接器 2、如何指定文件系统类型 创建表时通过 path 协议名称:///path 来指定 文件系统类型 参考官网文件系统类型 CREATE TABLE filesystem_table (id INT,name STRING,ds STRING ) partitioned by (ds) WITH (connector filesystem,-- 本地文件系统path file:///URI,-- HDFS文件系统path hdfs://URI,-- 阿里云对象存储 path oss://URI,format json ); 3、如何指定文件格式 FlinkSQL 文件系统连接器支持多种format来读取和写入文件 比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型 来对数据进行解析后映射到表中的字段中 CREATE TABLE filesystem_table_file_format (id INT,name STRING,ds STRING ) partitioned by (ds) WITH (connector filesystem,-- 指定文件格式类型format json|csv|orc|raw ); 4、读取文件系统 FlinkSQL可以将单个文件或整个目录的数据读取到单个表中 注意 1、当读取目录时对目录中的文件进行 无序的读取 2、默认情况下读取文件时为批处理模式只会扫描配置路径一遍后就会停止 当开启目录监控(source.monitor-interval)时才是流处理模式 4.1 开启 目录监控  通过设置 source.monitor-interval 属性来开启目录监控以便在新文件出现时继续扫描 注意 只会对指定目录内新增文件进行读取不会读取更新后的旧文件 -- 目录监控 drop table filesystem_source_table; CREATE TABLE filesystem_source_table (id INT,name STRING,file.name STRING NOT NULL METADATA ) WITH (connector filesystem,path file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016,format json,source.monitor-interval 3 -- 开启目录监控,设置监控时间间隔 );-- 持续读取 select * from filesystem_source_table;4.2 可用的 Metadata 使用FLinkSQL读取文件系统中的数据时支持对 metadata 进行读取 注意 所有 metadata 都是只读的 -- 可用的Metadata drop table filesystem_source_table_read_metadata; CREATE TABLE filesystem_source_table_read_metadata (id INT,name STRING,file.path STRING NOT NULL METADATA,file.name STRING NOT NULL METADATA,file.size BIGINT NOT NULL METADATA,file.modification-time TIMESTAMP_LTZ(3) NOT NULL METADATA ) WITH (connector filesystem,path file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012,format json );select * from filesystem_source_table_read_metadata; 运行结果 5、写出文件系统 5.1 创建分区表 FlinkSQL支持创建分区表并且通过 insert into(追加) 和 insert overwrite(覆盖) 写入数据 -- 创建分区表 drop table filesystem_source_table_partition; CREATE TABLE filesystem_source_table_partition (id INT,name STRING,ds STRING ) partitioned by (ds) WITH (connector filesystem,path file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012,partition.default-name default_partition,format json );-- 动态分区写入 insert into filesystem_source_table_partition SELECT * FROM (VALUES(1,a,20231010) , (2,b,20231010) , (3,c,20231011) , (4,d,20231011) , (5,e,20231012) , (6,f,20231012) ) AS user1 (id,name,ds);-- 静态分区写入 insert into filesystem_source_table_partition partition(ds 20231010) SELECT * FROM (VALUES(1,a) , (2,b) , (3,c) , (4,d) , (5,e) , (6,f) ) AS user1 (id,name);-- 查询分区表数据 select * from filesystem_source_table_partition where ds 20231010; 5.2 滚动策略、文件合并、分区提交 可以看之前的博客flink写入文件时分桶策略 官网链接官网分桶策略 5.3 指定 Sink Parallelism 当使用FlinkSQL写出到文件系统时可以通过 sink.parallelism 设置sink算子的并行度 注意当且仅当上游的 changelog 模式为 INSERT-ONLY 时才支持配置 sink parallelism。否则程序将会抛出异常 CREATE TABLE hdfs_sink_table (log STRING,dt STRING, -- 分区字段天hour STRING -- 分区字段小时 ) partitioned by (dt,hour) WITH (connector filesystem,path file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka,sink.parallelism 2, -- 指定sink算子并行度format raw ); 6、示例_通过FlinkSQL读取kafka在写入hive表 需求 使用FlinkSQL将kafka数据写入到hdfs指定目录中 根据kafka的timestamp进行分区按小时分区 6.1、创建 kafka source表用于读取kafka -- TODO 创建读取kafka表时同时读取kafka元数据字段 drop table kafka_source_table; CREATE TABLE kafka_source_table(log STRING,timestamp TIMESTAMP(3) METADATA FROM timestamp -- 消息的时间戳 ) WITH (connector kafka,topic 20231017,properties.bootstrap.servers worker01:9092,properties.group.id FlinkConsumer,scan.startup.mode earliest-offset,format raw ); 6.2、创建 hdfs sink表用于写出到hdfs drop table hdfs_sink_table; CREATE TABLE hdfs_sink_table (log STRING,dt STRING, -- 分区字段天hour STRING -- 分区字段小时 ) partitioned by (dt,hour) WITH (connector filesystem,path hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka,sink.parallelism 2, -- 指定sink算子并行度format raw ); 6.3、insert into 写入到 hdfs_sink_table -- 流式 sql插入文件系统表 insert into hdfs_sink_table select log,DATE_FORMAT(timestamp,yyyyMMdd) as dt,DATE_FORMAT(timestamp,HH) as hour from kafka_source_table; 6.4、查询 hdfs_sink_table -- 批式 sql使用分区修剪进行选择 select * from hdfs_sink_table; 6.5、创建hive表指定local create table kafka_to_hive ( log string comment 日志数据comment 埋点日志数据 PARTITIONED BY (dt string,hour string) row format delimited fields terminated by \t lines terminated by \n stored as orc LOCATION hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka;
http://www.hkea.cn/news/14296593/

相关文章:

  • 做网站什么什么千城网站建设
  • 专业做网站的公司有没有服务器公司建立网站的必要性
  • 信息管理网站开发实验体会网站建设原理试卷
  • 网页搭建app优化大师手机版下载安装app
  • 学校网站建设策划书模板做办公家具在哪个网站推销好
  • id注册网站ui设计介绍
  • 建工网官方网站wordpress 相册主题
  • 网站收录500多页淘宝上成都网站建设
  • 滨州网站建设报价中国知名网站建设公司
  • 律师网站模版商丘吴昊网络科技有限公司
  • 怎么才能建立自己的网站啊义乌网图科技有限公司怎么样
  • 做服装行业网站怎么每天更新内容服务器网站崩溃
  • 哪个网站可以做电视背景墙cos wordpress
  • 微信公众号开发微网站开发海外做淘宝网站
  • 物业公司网站建设策划书新品发布会结束语
  • 建设银行博士后招聘网站做网站百度关键排名
  • 网站优化推广价格免费网站建设协议
  • 哪个网站专门做游戏脚本南京网站制作工具
  • 培睿网站开发与设计电子信息工程专业招聘信息网
  • 电子商务网站建设教学计划网站主机域名
  • 罗庄网站建设深圳网站建设信科网络
  • 广州专业网站建设公司小程序的下载
  • 2003系统建网站好看个人网页模板
  • o2o网站建设公司排名动态表单的设计与实现
  • 深圳门户网站建设案例江苏网站seo平台
  • 孝感网站制作广渠门网站建设
  • 建行网站济南专业模板建站哪家好
  • 海口网站运营托管费用做外贸好还是跨境电商好
  • 新开三端互通传奇网站织梦个人网站模版
  • 数字化档案馆及网站的建设网站登陆模板下载