造纸公司网站建设,邀请码网站怎么做,工商企业信息公示系统,企业网站设计苏州一、目的
在完成错误数据表任务后#xff0c;需要对每条错误数据的错误字段及其字段值进行分析 Hive中原有SQL语句和ClickHouse现有SQL语句很大不同
二、Hive中原有代码
2.1 表结构
--31、静态排队数据清洗记录表
create table if not exists hurys_db.dwd_data_clean_…一、目的
在完成错误数据表任务后需要对每条错误数据的错误字段及其字段值进行分析 Hive中原有SQL语句和ClickHouse现有SQL语句很大不同
二、Hive中原有代码
2.1 表结构
--31、静态排队数据清洗记录表
create table if not exists hurys_db.dwd_data_clean_record_queue(id string comment 唯一ID,data_type int comment 1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源,device_no string comment 设备编号,create_time string comment 创建时间,field_name string comment 字段名,field_value string comment 字段值
)
comment 静态排队数据清洗记录表
partitioned by (day string)
stored as orc
;
2.2 SQL代码
with t3 as(
selectid,device_no,case when device_no is null then CONCAT(device_no:,null) END AS device_no_value,create_time,case when lane_no 0 or lane_no 255 then CONCAT(lane_no:, CAST(lane_no AS STRING)) END AS lane_no_value,case when queue_len 0 or queue_len 500 then CONCAT(queue_len:, CAST(queue_len AS STRING)) END AS queue_len_value,case when queue_head 0 or queue_head 500 then CONCAT(queue_head:, CAST(queue_head AS STRING)) END AS queue_head_value,case when queue_tail 0 or queue_tail 500 then CONCAT(queue_tail:, CAST(queue_tail AS STRING)) END AS queue_tail_value,case when queue_count 0 or queue_count 100 then CONCAT(queue_count:, CAST(queue_count AS STRING)) END AS queue_count_value,concat_ws(,,case when device_no is null then CONCAT(device_no:,null) end ,case when lane_no 0 or lane_no 255 then CONCAT(lane_no:, CAST(lane_no AS STRING)) END ,case when queue_len 0 or queue_len 500 then CONCAT(queue_len:, CAST(queue_len AS STRING)) END,case when queue_head 0 or queue_head 500 then CONCAT(queue_head:, CAST(queue_head AS STRING)) END,case when queue_tail 0 or queue_tail 500 then CONCAT(queue_tail:, CAST(queue_tail AS STRING)) END,case when queue_count 0 or queue_count 100 then CONCAT(queue_count:, CAST(queue_count AS STRING)) END) AS kv_pairs ,day
from hurys_db.dwd_queue_errorwhere day2024-09-10
)
insert overwrite table hurys_db.dwd_data_clean_record_queue partition(day)
selectid,6 data_type,t3.device_no,create_time,split(pair, :)[0] AS field_name,split(pair, :)[1] AS field_value,day
from t3
lateral view explode(split(t3.kv_pairs , ,)) exploded_table AS pair
where device_no_value is not null or queue_len_value is not null or lane_no_value is not null
or queue_head_value is not null or queue_tail_value is not null or queue_count_value is not null
;
三、ClickHouse中现有代码
3.1 表结构
--31、静态排队数据清洗记录表(长期存储)
create table if not exists hurys_jw.dwd_data_clean_record_queue(id String comment 唯一ID,data_type Nullable(Int32) comment 1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源,device_no Nullable(String) comment 设备编号,create_time DateTime comment 创建时间,field_name Nullable(String) comment 字段名,field_value Nullable(String) comment 字段值,day Date comment 日期
)
ENGINE MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity 8192;
3.2 SQL代码
SELECTid,6 AS data_type,device_no,create_time,splitByString(:, pair)[1] AS field_name,splitByString(:, pair)[2] AS field_value,day
FROM (SELECTid,device_no,create_time,day,arrayConcat(if(device_no IS NULL, [device_no:null], []),if(lane_no 0 OR lane_no 255, [concat(lane_no:, toString(lane_no))], []),if(queue_len 0 OR queue_len 500, [concat(queue_len:, toString(queue_len))], []),if(queue_head 0 OR queue_head 500, [concat(queue_head:, toString(queue_head))], []),if(queue_tail 0 OR queue_tail 500, [concat(queue_tail:, toString(queue_tail))], []),if(queue_count 0 OR queue_count 100, [concat(queue_count:, toString(queue_count))], [])) AS pairsFROM hurys_jw.dwd_queue_errorWHERE device_no IS NULL ORlane_no 0 OR lane_no 255 OR queue_len 0 OR queue_len 500 ORqueue_head 0 OR queue_head 500 OR queue_tail 0 OR queue_tail 500 ORqueue_count 0 OR queue_count 100
) AS subquery
array join pairs AS pair
;
注意1、错误数据表dwd_queue_error的清洗字段不能设置nullable这是一大坑 2、如果错误数据表中的清洗字段是Decimal(10,1)那么相关字段就要调整
arrayConcat(if(device_no IS NULL, [device_no:null], []),if(lane_no 0 OR lane_no 255, [concat(lane_no:, toString(lane_no))], []),if(azimuth 0 OR azimuth toDecimal32(359.9,1), [concat(azimuth:, toString(azimuth))], []),if(rcs -64 OR rcs toDecimal32(63.5,1), [concat(rcs:, toString(rcs))], []),if(prob 0 OR prob 100, [concat(prob:, toString(prob))], [])
) AS pairs
3.3 Kettle任务
3.3.1 newtime 3.3.2 替换NULL值 3.3.3 clickhouse输入 3.3.4 字段选择 3.3.5 clickhouse输出 3.3.6 执行任务 3.3.7 海豚调度 由于不需要实时记录因为把所有数据的清洗记录任务放在一个海豚工作流里面T1执行即可