数据治理(四):数据仓库数据质量管理

发布一下 0 0

数据仓库数据质量管理

下面我们针对音乐数据中心数仓项目第四个业务:“统计地区营收情况业务”来说明数据质量如何进行管理。此业务数据质量管理重点放在 ODS 层,EDS 层(DWD 层、DWS 层)、DM 层几个方面,每层数据校验的内容不一样,我们可以通过自己编写通用 shell+Hive 脚本或者使用质量监控工具 Griffin 来进行数据质量监控。

一、“商户营收统计”业务

1、商户营收统计数据分层信息

数据治理(四):数据仓库数据质量管理

以上业务涉及到的数仓所有表:

数据仓库分层设计:源业务系统:		ycak 	- 	user_location					- 用户上报经纬度信息表		ycak 	- 	user_machine_consume_detail 	- 机器消费订单明细表		ODS:user_location				-- TO_YCAK_USR_LOC_D		:用户位置记录日增量表	【增量导入】				user_machine_consume_detail	-- TO_YCAK_CNSM_D			:机器消费订单明细增量表	【增量导入】				machine_baseinfo 			-- TO_YCAK_MAC_D 			:机器基本信息日全量表	【全量导入】		machine_local_info 			-- TO_YCAK_MAC_LOC_D 		:机器位置信息日全量表	【全量导入】		machine_admin_map			-- TO_YCBK_MAC_ADMIN_MAP_D	:机器客户映射关系资料日全量表	【全量导入】		machine_store_map			-- TO_YCBK_MAC_STORE_MAP_D	:机器门店映射关系日全量表		【全量导入】		machine_store_info			-- TO_YCBK_STORE_D			:门店信息日全量表		【全量导入】		province_info				-- TO_YCBK_PRVC_D			:机器省份日全量表		【全量导入】		city_info					-- TO_YCBK_CITY_D			:机器城市日全量表		【全量导入】				area_info					-- TO_YCBK_AREA_D			:机器区县日全量表		【全量导入】				user_alipay_baseinfo		--	TO_YCAK_USR_ALI_D		:支付宝用户全量表 		【全量导入】		user_wechat_baseinfo		--	TO_YCAK_USR_D			:微信用户全量表			【全量导入】		user_qq_baseinfo			--	TO_YCAK_USR_QQ_D		:QQ用户日全量表			【全量导入】		user_app_baseinfo			--  TO_YCAK_USR_APP_D		:APP用户信息日全量表	【全量导入】			user_login_info				-- 	TO_YCAK_USR_LOGIN_D		:用户登录数据表日增量	【增量导入】 			EDS:		TW_MAC_LOC_D		- 机器位置信息日统计表				TW_CNSM_BRIEF_D 	- 消费退款订单流水日增量表				TW_MAC_BASEINFO_D 	- 机器基础信息日全量表 【第二个业务】				TW_USR_BASEINFO_D 	- 活跃用户基础信息日增量表 【第三个业务】				TW_MAC_STAT_D - 机器日营收情况统计表			DM: 		TM_USR_MRCHNT_STAT_D	-	商户日营收统计表【mysql中也有对应的表



ODS层:TO_YCAK_USR_LOC_D		:用户位置记录日增量表	【增量导入】TO_YCAK_CNSM_D			:机器消费订单明细增量表	【增量导入】TO_YCAK_MAC_D 			:机器基本信息日全量表	【全量导入】TO_YCAK_MAC_LOC_D 		:机器位置信息日全量表	【全量导入】TO_YCBK_MAC_ADMIN_MAP_D	:机器客户映射关系资料日全量表	【全量导入】TO_YCBK_MAC_STORE_MAP_D	:机器门店映射关系日全量表		【全量导入】TO_YCBK_STORE_D			:门店信息日全量表		【全量导入】TO_YCBK_PRVC_D			:机器省份日全量表		【全量导入】TO_YCBK_CITY_D			:机器城市日全量表		【全量导入】		TO_YCBK_AREA_D			:机器区县日全量表		【全量导入】TO_YCAK_USR_ALI_D		:支付宝用户全量表 		【全量导入】TO_YCAK_USR_D			:微信用户全量表			【全量导入】TO_YCAK_USR_QQ_D		:QQ用户日全量表			【全量导入】TO_YCAK_USR_APP_D		:APP用户信息日全量表	【全量导入】	TO_YCAK_USR_LOGIN_D		:用户登录数据表日增量	【增量导入】EDS层:TW_MAC_LOC_D		- 机器位置信息日统计表TW_CNSM_BRIEF_D 	- 消费退款订单流水日增量表TW_MAC_BASEINFO_D 	- 机器基础信息日全量表 【第二个业务】TW_USR_BASEINFO_D 	- 活跃用户基础信息日增量表 【第三个业务】TW_MAC_STAT_D - 机器日营收情况统计表DM层:TM_USR_MRCHNT_STAT_D	-	商户日营收统计表



2、清空之前业务执行的数据

#删除Mysql中ycak与ycbk库即可#删除所有Hive表脚本drop table TO_YCAK_MAC_D;drop table TO_YCAK_MAC_LOC_D;drop table TO_YCBK_AREA_D;drop table TO_YCBK_CITY_D;drop table TO_YCBK_MAC_ADMIN_MAP_D;drop table TO_YCBK_MAC_STORE_MAP_D;drop table TO_YCBK_PRVC_D;drop table TO_YCBK_STORE_D;drop table TW_MAC_BASEINFO_D;drop table TO_YCAK_USR_D;		drop table TO_YCAK_USR_ALI_D;drop table TO_YCAK_USR_QQ_D;drop table TO_YCAK_USR_APP_D;drop table TO_YCAK_USR_LOGIN_D;drop table TW_USR_BASEINFO_D;drop table TO_YCAK_USR_LOC_D;drop table TW_MAC_LOC_D;drop table TO_YCAK_CNSM_D;drop table TW_CNSM_BRIEF_D;drop table TW_MAC_STAT_D;drop table TM_USR_MRCHNT_STAT_D;drop table TM_MAC_REGION_STAT_D;#执行以下命令,清空HDFS 目录即可hdfs dfs -rm -r /user/hive/warehouse/data/



3、重新创建 Hive 各层数据表:

#执行如下脚本,直接创建,所有Hive中需要的表CREATE EXTERNAL TABLE `TO_YCAK_MAC_D`( `MID` int,  `SRL_ID` string,  `HARD_ID` string,  `SONG_WHSE_VER` string,  `EXEC_VER` string,  `UI_VER` string,  `IS_ONLINE` string,  `STS` int,  `CUR_LOGIN_TM` string,  `PAY_SW` string,  `LANG` int,  `SONG_WHSE_TYPE` int,  `SCR_TYPE` int)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCAK_MAC_D';CREATE EXTERNAL TABLE `TO_YCAK_MAC_LOC_D`( `MID` int,  `PRVC_ID` int,  `CTY_ID` int,  `PRVC` string,  `CTY` string,  `MAP_CLSS` string,  `LON` string,  `LAT` string,  `ADDR` string,  `ADDR_FMT` string,  `REV_TM` string,  `SALE_TM` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCAK_MAC_LOC_D';CREATE EXTERNAL TABLE `TO_YCBK_MAC_ADMIN_MAP_D`( `MID` int,  `MAC_NM` string,  `PKG_NUM` int,  `PKG_NM` string,  `INV_RATE` double,  `AGE_RATE` double,  `COM_RATE` double,  `PAR_RATE` double,  `DEPOSIT` double,  `SCENE_PRVC_ID` string,  `SCENE_CTY_ID` string,  `SCENE_AREA_ID` string,  `SCENE_ADDR` string,  `PRDCT_TYPE` string,  `SERIAL_NUM` string,  `HAD_MPAY_FUNC` int,  `IS_ACTV` int,  `ACTV_TM` string, `ORDER_TM` string, `GROUND_NM` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_MAC_ADMIN_MAP_D';CREATE EXTERNAL TABLE `TO_YCBK_MAC_STORE_MAP_D`( `STORE_ID` int,  `MID` int,  `PRDCT_TYPE` int,  `ADMINID` int,  `CREAT_TM` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_MAC_STORE_MAP_D';CREATE EXTERNAL TABLE `TO_YCBK_STORE_D`( `ID` int,  `STORE_NM` string,  `TAG_ID` string,  `TAG_NM` string,  `SUB_TAG_ID` string, `SUB_TAG_NM` string, `PRVC_ID` string, `CTY_ID` string, `AREA_ID` string, `ADDR` string, `GROUND_NM` string, `BUS_TM` string, `CLOS_TM` string, `SUB_SCENE_CATGY_ID` string, `SUB_SCENE_CATGY_NM` string, `SUB_SCENE_ID` string, `SUB_SCENE_NM` string, `BRND_ID` string, `BRND_NM` string, `SUB_BRND_ID` string, `SUB_BRND_NM` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_STORE_D';CREATE EXTERNAL TABLE `TO_YCBK_PRVC_D`( `PRVC_ID` int,  `PRVC` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_PRVC_D';CREATE EXTERNAL TABLE `TO_YCBK_CITY_D`( `PRVC_ID` int,  `CTY_ID` int, `CTY` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_CITY_D';CREATE EXTERNAL TABLE `TO_YCBK_AREA_D`( `CTY_ID` int,  `AREA_ID` int, `AREA` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_AREA_D';CREATE EXTERNAL TABLE `TW_MAC_BASEINFO_D`( `MID` int,  `MAC_NM` string, `SONG_WHSE_VER` string,  `EXEC_VER` string,  `UI_VER` string,  `HARD_ID` string,  `SALE_TM` string,  `REV_TM` string,  `OPER_NM` string,  `PRVC` string,  `CTY` string,  `AREA` string,  `ADDR` string, `STORE_NM` string, `SCENCE_CATGY` string,  `SUB_SCENCE_CATGY` string,  `SCENE` string,  `SUB_SCENE` string,  `BRND` string,  `SUB_BRND` string,  `PRDCT_NM` string,  `PRDCT_TYP` int,  `BUS_MODE` string,  `INV_RATE` double,  `AGE_RATE` double,  `COM_RATE` double,  `PAR_RATE` double,  `IS_ACTV` int,  `ACTV_TM` string, `PAY_SW` int, `PRTN_NM` string, `CUR_LOGIN_TM` string )PARTITIONED BY (data_dt string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_BASEINFO_D';CREATE EXTERNAL TABLE `TO_YCAK_USR_D`( `UID` int,  `REG_MID` int,  `GDR` string,  `BIRTHDAY` string, `MSISDN` string, `LOC_ID` int, `LOG_MDE` int, `REG_TM` string, `USR_EXP` string, `SCORE` int, `LEVEL` int, `WX_ID` string )ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_D';CREATE EXTERNAL TABLE `TO_YCAK_USR_ALI_D`( `UID` int,  `REG_MID` int,  `GDR` string,  `BIRTHDAY` string, `MSISDN` string, `LOC_ID` int, `LOG_MDE` int, `REG_TM` string, `USR_EXP` string, `SCORE` int, `LEVEL` int, `USR_TYPE` string, `IS_CERT` string, `IS_STDNT` string, `ALY_ID` string  )ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_ALI_D';CREATE EXTERNAL TABLE `TO_YCAK_USR_QQ_D`( `UID` int,  `REG_MID` int,  `GDR` string,  `BIRTHDAY` string, `MSISDN` string, `LOC_ID` int, `LOG_MDE` int, `REG_TM` string, `USR_EXP` string, `SCORE` int, `LEVEL` int, `QQID` string  )ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_QQ_D';CREATE EXTERNAL TABLE `TO_YCAK_USR_APP_D`( `UID` int,  `REG_MID` int,  `GDR` string,  `BIRTHDAY` string, `MSISDN` string, `LOC_ID` int, `REG_TM` string, `USR_EXP` string, `LEVEL` int, `APP_ID` string  )ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_APP_D';CREATE EXTERNAL TABLE `TO_YCAK_USR_LOGIN_D`( `ID` int,  `UID` int,  `MID` int,  `LOGIN_TM` string, `LOGOUT_TM` string, `MODE_TYPE` int )PARTITIONED BY (`data_dt` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_LOGIN_D';CREATE EXTERNAL TABLE `TW_USR_BASEINFO_D`( `UID` int,  `REG_MID` int,  `REG_CHNL` string,  `REF_UID` string, `GDR` string, `BIRTHDAY` string, `MSISDN` string, `LOC_ID` int, `LOG_MDE` string, `REG_DT` string, `REG_TM` string, `USR_EXP` string, `SCORE` int, `LEVEL` int, `USR_TYPE` string, `IS_CERT` string, `IS_STDNT` string )PARTITIONED BY (`data_dt` string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TW_USR_BASEINFO_D';CREATE EXTERNAL TABLE `TO_YCAK_USR_LOC_D`( `ID` int,  `UID` int,  `LAT` string,  `LNG` string, `DATETIME` string, `MID` string )PARTITIONED BY (data_dt string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_LOC_D';CREATE EXTERNAL TABLE `TW_MAC_LOC_D`( `MID` int,  `X` string,  `Y` string,  `CNT` int, `ADDER` string, `PRVC` string, `CTY` string, `CTY_CD` string, `DISTRICT` string, `AD_CD` string, `TOWN_SHIP` string, `TOWN_CD` string, `NB_NM` string, `NB_TP` string, `BD_NM` string, `BD_TP` string, `STREET` string, `STREET_NB` string, `STREET_LOC` string, `STREET_DRCTION` string, `STREET_DSTANCE` string, `BUS_INFO` string )PARTITIONED BY (data_dt string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_LOC_D';CREATE EXTERNAL TABLE `TO_YCAK_CNSM_D`( `ID` int,  `MID` int,  `PRDCD_TYPE` int,  `PAY_TYPE` int, `PKG_ID` int, `PKG_NM` string, `AMT` int, `CNSM_ID` string, `ORDR_ID` string, `TRD_ID` string, `ACT_TM` string, `UID` int, `NICK_NM` string, `ACTV_ID` int, `ACTV_NM` string, `CPN_TYPE` int, `CPN_TYPE_NM` string, `PKG_PRC` int, `PKG_DSCNT` int, `ORDR_TYPE` int, `BILL_DT` int )PARTITIONED BY (data_dt string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_CNSM_D';CREATE EXTERNAL TABLE `TW_CNSM_BRIEF_D`( `ID` int,  `TRD_ID` string,  `UID` string,  `MID` int, `PRDCD_TYPE` int, `PAY_TYPE` int, `ACT_TM` string, `PKG_ID` int, `COIN_PRC` int, `COIN_CNT` int, `UPDATE_TM` string, `ORDR_ID` string, `ACTV_NM` string, `PKG_PRC` int, `PKG_DSCNT` int, `CPN_TYPE` int, `ABN_TYP` int )PARTITIONED BY (data_dt string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TW_CNSM_BRIEF_D';CREATE EXTERNAL TABLE `TW_MAC_STAT_D`( `MID` int,  `MAC_NM` string,  `PRDCT_TYPE` string,  `STORE_NM` int, `BUS_MODE` string, `PAY_SW` string, `SCENCE_CATGY` string, `SUB_SCENCE_CATGY` string, `SCENE` string, `SUB_SCENE` string, `BRND` string, `SUB_BRND` string, `PRVC` string, `CTY` string, `AREA` string, `AGE_ID` string, `INV_RATE` string, `AGE_RATE` string, `COM_RATE` string, `PAR_RATE` string, `PKG_ID` string, `PAY_TYPE` string, `CNSM_USR_CNT` string, `REF_USR_CNT` string, `NEW_USR_CNT` string, `REV_ORDR_CNT` string, `REF_ORDR_CNT` string, `TOT_REV` string, `TOT_REF` string )PARTITIONED BY (data_dt string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_STAT_D';CREATE EXTERNAL TABLE `TM_USR_MRCHNT_STAT_D`( `ADMIN_ID` string,  `PAY_TYPE` int,  `REV_ORDR_CNT` int,  `REF_ORDR_CNT` int, `TOT_REV` double, `TOT_REF` double, `TOT_INV_REV` DECIMAL(10,2), `TOT_AGE_REV` DECIMAL(10,2), `TOT_COM_REV` DECIMAL(10,2), `TOT_PAR_REV` DECIMAL(10,2) )PARTITIONED BY (DATA_DT string)ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TM_USR_MRCHNT_STAT_D';


4、向 MySQL 中导入对应库的数据

向 MySQL 中导入 ycak 与 ycbk 两个库数据。

5、准备 ODS 层导入数据脚本:

#在安装SQOOP的node3节点/root/test下,上传导入数据脚本ods_mysqltohive_to_ycak_cnsm_d.shods_mysqltohive_to_ycak_mac_d.shods_mysqltohive_to_ycak_mac_loc_d.shods_mysqltohive_to_ycak_usr_ali_d.shods_mysqltohive_to_ycak_usr_app_d.shods_mysqltohive_to_ycak_usr_d.shods_mysqltohive_to_ycak_usr_loc_d.shods_mysqltohive_to_ycak_usr_login_d.shods_mysqltohive_to_ycak_usr_qq_d.shods_mysqltohive_to_ycbk_area_d.shods_mysqltohive_to_ycbk_city_d.shods_mysqltohive_to_ycbk_mac_admin_map_d.shods_mysqltohive_to_ycbk_mac_store_map_d.shods_mysqltohive_to_ycbk_prvc_d.shods_mysqltohive_to_ycbk_store_d.sh


6、将项目代码打包上传

将项目代码中本地执行改成集群执行,打包上传到 node4 节点/root/test 下

7、准备执行的任务脚本

#在node5节点/root/test下上传执行任务脚本produce1.shproduce2.shproduce3.shproduce4.shproduce5.shproduce6.sh



8、启动 Azkaban

#启动Azkaban-Executor/software/azkaban/azkaban-exec-server/bin/start-exec.sh#激活Azkaban Executorhttp://node5:12321/executor?action=activate#启动Azkaban-Server/software/azkaban/azkaban-web-server/bin/start-web.sh#访问Azkaban  user:azkaban pwd:azkabanhttp://node5:8081/


9、编写 Azkaban job 并打包,提交到 Azkaban 执行:

数据治理(四):数据仓库数据质量管理


二、ODS 层数据质量监控

由于 ODS 层数据是贴源层,是数仓开始的地方,所以这里检验时一般不需要验证与原始数据条目是否相同,在 ODS 层数据质量监控中一般验证当日导入数据的记录数、当日导入表中关注字段为空的记录数、当日导入数据关注字段重复记录数、全表总记录数指标即可。

1、在 Hive 中创建存储数据质量监控数据库“data_quality”

hive> create database data_quality;



2、创建存储 ODS 层数据质量监控数据表“check_ods_info”

create table data_quality.check_ods_info(  dt String comment '执行日期',  db String comment '数据库名称',  tbl String comment '校验表名',  tbl_type string comment '增量导入/全量导入',  check_col String comment '校验为空的列名',  current_dt_rowcnt bigint comment '当日导入数据记录数',  check_null_rowcnt bigint comment '当日检查列为空的记录数',  duplication_rowcnt bigint comment '重复记录数',  total_rowcnt bigint comment '表总记录数') row format delimited fields terminated by '\t';



注意以上中文注释在 Hive 中不支持,可以在 Hive 的 mysql 元数据对应的库中执行如下 SQL 支持中文显示:

alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;alter table PARTITION_PARAMS  modify column PARAM_VALUE varchar(4000) character set utf8;alter table PARTITION_KEYS  modify column PKEY_COMMENT varchar(4000) character set utf8;alter table  INDEX_PARAMS  modify column PARAM_VALUE  varchar(4000) character set utf8;


3、编写数据校验脚本

ods 数据质量校验脚本名称为:“ods_data_check.sh”编写的脚本需要传入 5 个参数:校验数据日期、Hive 库名、校验表名、是否增量(1 代表增量,2 代表全量)、校验为空的字段。脚本内容如下

#!/bin/bash# 数据检查执行的日期current_dt=$1# 校验数据的库名db_name=$2# 校验数据的表名table_name=$3# 表增量导入/全量导入 1-增量 2-全量is_increment=$4# 需要校验为空的列名,以逗号隔开check_col=$5# 切割列,获取数组array=(${check_col//,/ })#where sql where_sql=""#动态拼接SQL where 条件for((i=0;i<${#array[@]};i++))do if [ $i -eq 0 ]; then    where_sql=" where ${array[i]} is null" else    where_sql="${where_sql} or ${array[i]} is null" fidone#判断全量或增量来区分处理SQLif [ ${is_increment} -eq 1 ];then  is_increment='increment'  #HQL获取当日增量导入数据的记录数据  current_dt_rowcnt=`hive -e "select count(*) from ${table_name} where data_dt = '${current_dt}'"`  check_null_rowcnt=`hive -e "select count(*) from ${table_name} ${where_sql} and data_dt = '${current_dt}'"`  duplication_rowcnt=`hive -e "select nvl(sum(tmp.cnt),0) from (select count(*) as cnt from ${table_name} where data_dt = '${current_dt}' group by ${check_col} having count(*) >1) tmp"`else  is_increment='all'  #HQL获取当日增量导入数据的记录数据  current_dt_rowcnt=`hive -e "select count(*) from ${table_name}"`  check_null_rowcnt=`hive -e "select count(*) from ${table_name} ${where_sql}"`  duplication_rowcnt=`hive -e "select nvl(sum(tmp.cnt),0) from (select count(*) as cnt from ${table_name} group by ${check_col} having count(*) >1) tmp"`fi# HQL获取全表总记录数total_cnt=`hive -e "select count(*) from ${table_name}"`echo "日期:$current_dt ,库名:${db_name},表名:${table_name},是否增量:${is_increment}, 需要校验的列:${check_col}"echo "当日导入数据记录数:${current_dt_rowcnt},当日检查列为空的记录数:${check_null_rowcnt},当日导入数据重复数:${duplication_rowcnt} ,表总记录数:${total_cnt}"#将数据结果插入到 data_quality.check_ods_info 表中`hive -e "insert into data_quality.check_ods_info values ('${current_dt}','${db_name}','${table_name}','${is_increment}','${check_col}',${current_dt_rowcnt},${check_null_rowcnt},${duplication_rowcnt},${total_cnt})"`echo "------ finish ------"



4、使用以上脚本对 ODS 部分表数据进行校验

#这里对ODS层部分增量表和全量表进行校验,部分表如下#TO_YCAK_USR_LOC_D		:用户位置记录日增量表	【增量导入】#TO_YCAK_CNSM_D			:机器消费订单明细增量表	【增量导入】#TO_YCAK_MAC_D 			:机器基本信息日全量表	【全量导入】#TO_YCAK_MAC_LOC_D 		:机器位置信息日全量表	【全量导入】# 命令如下:sh /root/test/ods_data_check.sh 20210315 default TO_YCAK_USR_LOC_D 1  UID,MIDsh /root/test/ods_data_check.sh 20210315 default TO_YCAK_CNSM_D 1  MID,PRDCD_TYPE,PAY_TYPE,ORDR_ID,UIDsh /root/test/ods_data_check.sh 20210315 default TO_YCAK_MAC_D 2  MID,SRL_IDsh /root/test/ods_data_check.sh 20210315 default TO_YCAK_MAC_LOC_D 2  MID,PRVC,CTY



5、查看 ODS 层校验结果

#登录Hive ,切换data_quality库,查询数据select * from check_ods_info;



数据治理(四):数据仓库数据质量管理

三、EDS 层数据质量监控

EDS 层相当于数据仓库中的 DW 层,DW 层详细划分又分为 DWD、DWS 层,此业务中也有 DWD 和 DWS 层数据,只是在 EDS 层中没有详细区分,在 EDS 层中具体哪些是 DWD 层与 DWS 层数据可以参照数据分层图。

数据治理(四):数据仓库数据质量管理

EDS 层质量校验分为对 DWD 层数据质量进行校验,对 DWS 层数据质量进行校验两个方面。对 DWD 层数据质量校验关注点在于是否与 ODS 层对应的数据来源表数据记录数是否一致、导入到 DWD 层的数据有效比例等,针对不同的 DWD 层的数据表也可以根据具体业务来决定质量检验的内容。

由于 DWS 层数据基于 DWD 层数据进行了聚合,所以对于 DWS 层数据质量校验关注点可以放在数据条目是否和上一层一致(需要分清主题重要字段),更重要的是这里检验 DWS 层表中数据总条数,某些重要字段为空的记录数,关注的聚合字段值异常条数等。针对不同的 DWS 层的数据表也可以根据具体业务来决定质量检验的内容。

以上 EDS 层中各层数据质量校验具体校验的内容一般根据业务不同是不同的,不能抛开业务来谈数据质量,可以使用具体脚本个性化校验。

1、DWD 层数据质量校验

这里 DWD 层数据质量校验以“机器详细信息统计”业务为例,来说明 DWD 层数据质量如何校验。“机器详细信息统计”分层信息如下:

数据治理(四):数据仓库数据质量管理

以上 ODS 层机器数据到 EDS 层机器数据,是由 ODS 层多张表数据“TO_YACK_MAC_D”、“TO_YCBK_MAC_ADMIN_MAP_D”合并统计得到 EDS 层“TW_MAC_BASEINFO_D”表数据,基于以上我们可以校验以下几个方面来确定数据质量:

  • ODS 层源表记录总数
  • EDS 层目标表记录总数
  • 数据记录有效比例(由于 EDS 层数据一定来自于 ODS 层,可能中间会有清洗数据,导致 EDS 层数据缺少一部分,所以这里记录有效比例就是使用“目标表数据总记录/源表数据总记录”来反应质量情况)
  • 目标表重复记录数(可能会由于 ODS 转到 EDS 层过程中,业务上关联其他表查询导致目标表数据会有重复)

基于以上方面,进行 EDS-DWD 层数据质量监控,步骤如下:

1)创建 DWD 数据质量结果存储表

create table data_quality.check_dwd_info(  dt String comment '执行日期',  db String comment '数据库名称',  check_cols String comment '校验字段',  source_tbl String comment 'ODS层源表表名',  source_tbl_rowcnt bigint comment '源表数据总记录数',  target_tbl String comment 'DWD层目标表表名',  target_tbl_rowcnt bigint comment '目标表数据总记录数',  rt Decimal(5,2) comment '数据有效比例',  target_duplication_rowcnt bigint comment '目标表数据重复记录数') row format delimited fields terminated by '\t';


2)编写 DWD 层数据校验脚本

DWD 层数据质量校验脚本名称为:“dwd_data_check.sh”编写的脚本需要传入 5 个参数:校验数据日期、Hive 库名、校验字段、ODS 层数据源表(可能多张)、EDS 层目标表。脚本内容如下:

#!/bin/bash# 数据检查执行的日期current_dt=$1# 校验数据的库名db_name=$2# 校验字段,多个字段使用逗号隔开check_cols=$3# ODS层源表表名,多表用逗号隔开ods_tbls=$4# DWD层目标表表名target_tbl=$5# 切割多个源表,查询源表关注字段的总条数tbl_arr=(${ods_tbls//,/ })# 查询源表数据SQL source_sql=""# 动态拼接SQL union 各个源表数据for((i=0;i<${#tbl_arr[@]};i++))do if [ $i -eq 0 ]; then    source_sql="select ${check_cols} from ${tbl_arr[i]} " else    source_sql="${source_sql} union all select ${check_cols} from ${tbl_arr[i]} " fidone# 查询SQL 获取源表关注字段总条数source_tbl_rowcnt=`hive -e "select count(*) from ( ${source_sql} ) t1"`# 查询SQL 获取目标表关注字段总条数target_tbl_rowcnt=`hive -e "select count(*) from (select ${check_cols} from ${target_tbl}) t2"`# 统计目标表与源表数据比例rt=`awk 'BEGIN{printf "%0.2f",'${target_tbl_rowcnt}'/'${source_tbl_rowcnt}'}'`# 查询SQL 统计目标表数据重复记录数target_duplication_rowcnt=`hive -e "select nvl(sum(tmp.cnt),0) from (select count(*) as cnt from ${target_tbl} where data_dt = '${current_dt}' group by ${check_cols} having count(*) >1) tmp"`echo "日期:$current_dt ,库名:${db_name},ODS源表表名:${ods_tbls},目标表表名:${target_tbl}, 需要校验的列:${check_cols}"echo "源表数据记录总数:${source_tbl_rowcnt},目标表记录数:${target_tbl_rowcnt},目标表与源表数据比例:${rt} ,目标表重复记录数:${target_duplication_rowcnt}"#将数据结果插入到 data_quality.check_dwd_info 表中`hive -e "insert into data_quality.check_dwd_info values ('${current_dt}','${db_name}','${check_cols}','${ods_tbls}',${source_tbl_rowcnt},'${target_tbl}',${target_tbl_rowcnt},${rt},${target_duplication_rowcnt})"`echo "------ finish ------"


3)使用以上脚本对 EDS-DWD 层数据部分表进行校验

#这里针对“机器详情统计”业务对应的ODS-EDS层的表进行校验,命令如下:sh dwd_data_check.sh 20210315 default mid TO_YCAK_MAC_D,TO_YCBK_MAC_ADMIN_MAP_D TW_MAC_BASEINFO_D#此外,针对“消费退款统计”业务可执行校验,命令如下:sh dwd_data_check.sh 20210315 default MID,PRDCD_TYPE TO_YCAK_CNSM_D TW_CNSM_BRIEF_D


4)查看结果

#登录Hive ,切换data_quality库,查询数据select * from check_dwd_info;



数据治理(四):数据仓库数据质量管理

2、DWS 层数据质量校验

DWS 层数据质量校验,这里校验“机器详情信息”业务中,EDS-DWS 层表“TW_MAC_STAT_D”表的质量,这里由于 DWS 层数据由 EDS 层聚合而来组成的宽表,所以,这里校验时可以校验主题数据是否和上层一致,更重要的是校验 DWS 层表中数据总条数,某些重要字段为空的记录数,关注的聚合字段值异常条数等。

这里针对“TW_MAC_STAT_D”表我们校验表中以下内容:

  • 表中数据总记录数
  • 省份、城市字段为空的记录数
  • 总营收不合理记录数

校验步骤如下:

1)创建 DWS 层校验结果存放表

create table data_quality.check_dws_info(  dt String comment '执行日期',  db String comment '数据库名称',  target_tbl string comment '校验表名',  check_null_cols String comment '校验空值字段',  null_row_cnt bigint comment '当日空值字段记录数',  check_value_cols String comment '校验值字段',  maxValue Decimal(10,2) comment '合理值最大值',  abnormal_rowcnt bigint comment '当日异常值记录数',  total_rowcnt bigint comment '当日表中总记录数') row format delimited fields terminated by '\t';


2)编写校验脚本

DWS 层数据质量校验脚本名称为:“dws_data_check.sh”编写的脚本需要传入 5 个参数:校验数据日期、Hive 库名、校验表名、校验 Null 值字段(可多列)、校验值字段(单列),标准值最大值。脚本内容如下:

#!/bin/bash# 数据检查执行的日期current_dt=$1# 校验数据的库名db_name=$2# 校验表名dws_tbl_name=$3# 校验null值字段,多个字段使用逗号隔开check_null_cols=$4echo "${check_null_cols}"# 校验值字段check_value_col=$5# 标准值的最大值max_value=$6# 切割校验null值字段array=(${check_null_cols//,/ })# 查询DWS表数据SQL check_sql=""# 动态拼接SQL 检查null值条数for((i=0;i<${#array[@]};i++))do if [ $i -eq 0 ]; then    check_sql="select count(*) from ${db_name}.${dws_tbl_name} where data_dt=${current_dt} and ${array[i]} is null " else    check_sql="${check_sql} and ${array[i]} is null " fidone# 查询SQL 获取DWS表中空值数据记录数null_row_cnt=`hive -e "${check_sql}"`# 查询SQL 获取校验值异常记录数abnormal_rowcnt=`hive -e "select sum(if(${check_value_col}>${max_value},1,0)) from ${db_name}.${dws_tbl_name} where data_dt = ${current_dt}"`# 统计当前日期下表中统计总记录数total_rowcnt=`hive -e "select count(*) from ${db_name}.${dws_tbl_name} where data_dt = ${current_dt}"`echo "日期:$current_dt ,库名:${db_name},DWS校验表名:${dws_tbl_name},需要校验为null的列:${check_null_cols},检验值是否合理的列:${check_value_col},合理值最大值:${max_value}"echo "当日空值字段记录数:${null_row_cnt},当日异常值记录数:${abnormal_rowcnt},当日表数据总记录数:${total_rowcnt}"#将数据结果插入到 data_quality.check_dws_info 表中`hive -e "insert into data_quality.check_dws_info values ('${current_dt}','${db_name}','${dws_tbl_name}','${check_null_cols}',${null_row_cnt},'${check_value_col}',${max_value},${abnormal_rowcnt},${total_rowcnt})"`echo "------ finish ------"


3)使用以上脚本对 EDS-DWS 数据进行校验

#对 EDS-DWS层数据表“TW_MAC_STAT_D”进行校验sh dws_data_check.sh 20210315 default TW_MAC_STAT_D PRVC,CTY TOT_REV 10000


4)查看结果数据

#登录Hive ,切换data_quality库,查询数据select * from check_dws_info;



数据治理(四):数据仓库数据质量管理

四、DM 层数据质量监控

DM 层存放数仓中对宽表进行聚合统计得到的结果值数据,所以这里对 DM 层数据进行质量校验时,无法使用通用的脚本处理,而是每个报表数据都要不同的校验指标,DM 层主要校验数据值是否在合理范围内,与 DWS 层校验类似,这里不再校验“商户营收统计”业务中 DM 层表“TM_USR_MRCHNT_STAT_D”。

版权声明:内容来源于互联网和用户投稿 如有侵权请联系删除

本文地址:http://0561fc.cn/152089.html