场景:
由于公司目前没有特定的产品去实现关于数据清洗,目前只能通过对应的存储过程方式来做处理。
整个ETL实现流程图如下:
业务提供清洗规则:
具体实现方案:
按照业务规则,需要对数据缺失值做清洗,对数据范围值域做清洗,非数值清洗,非日期值做清洗,包括空格清洗做相应处理。对需要做数据清洗的表字段和特定业务校验规则,做分类标记。
a)生成配置表。表结构如下:
b)创建表 DATA_DEF_LONG_F 用来承载字段默认内容。
c)表SYS_SP_MONITOR_LOG 是用来记录每个程序运行日志。
创建表脚本如下:
--功能: 将字段DATA_DEFAULT LONG类型转换成VARCHAR2类型,用于后续判断字段是否数据默认
create table DATA_DEF_LONG_F
(
tab_name VARCHAR2(100),
tab_column VARCHAR2(100),
tab_long VARCHAR2(100)
)
tablespace TEMP;
数据清洗规则配置表
-- Create table
create table DM_CHECK_RULE_F
(
table_name VARCHAR2(100),
chenck_column VARCHAR2(100),
data_definition VARCHAR2(100),
endue_default VARCHAR2(100),
min_value NUMBER,
max_value NUMBER,
min_len_value NUMBER,
max_len_value NUMBER,
cleaning_mark NUMBER,
descriptor VARCHAR2(1000),
basic_path VARCHAR2(1000),
column_meaning VARCHAR2(1000)
)
tablespace TEMP;
-- Add comments to the table
comment on table DM_CHECK_RULE_F
is '数据清洗规则配置表';
-- Add comments to the columns
comment on column DM_CHECK_RULE_F.table_name
is '需要清洗数据表名称';
comment on column DM_CHECK_RULE_F.chenck_column
is '需要清洗数据表字段';
comment on column DM_CHECK_RULE_F.data_definition
is '业务定义数据值域';
comment on column DM_CHECK_RULE_F.endue_default
is '强制转换特定赋值';
comment on column DM_CHECK_RULE_F.min_value
is '数值值域最小值';
comment on column DM_CHECK_RULE_F.max_value
is '数值值域最大值';
comment on column DM_CHECK_RULE_F.min_len_value
is '字段长度值域最小值';
comment on column DM_CHECK_RULE_F.max_len_value
is '字段长度值域最大值';
comment on column DM_CHECK_RULE_F.cleaning_mark
is '需要清洗数据类型(1:编码转化 2:非空清洗 3:空格清洗 4:值域清洗 5:数值清洗 6:长度清洗 7:日期值清洗 9:强制转换)';
comment on column DM_CHECK_RULE_F.descriptor
is '清洗字段具体实现数值';
comment on column DM_CHECK_RULE_F.basic_path
is '要素路径';
comment on column DM_CHECK_RULE_F.column_meaning
is '字段的含义';
-- Create table
create table SYS_SP_MONITOR_LOG
(
sp_proc_name VARCHAR2(100),
start_date DATE,
end_date DATE,
etl_date DATE,
sp_status VARCHAR2(50),
sp_insert_num NUMBER,
sp_update_num NUMBER,
sp_subject VARCHAR2(100),
sp_sub_subject VARCHAR2(200),
sp_error_info VARCHAR2(4000)
)
tablespace TEMP;
-- Create table
create table SYS_PJOB_MONITOR_LOG
(
pjob_name VARCHAR2(500),
etl_date VARCHAR2(8),
start_date DATE,
end_date DATE,
pjob_status VARCHAR2(10),
pjob_count NUMBER,
date_source VARCHAR2(80),
pjob_mark VARCHAR2(10)
)
tablespace TEMP;
-- Add comments to the columns
comment on column SYS_PJOB_MONITOR_LOG.pjob_name
is '作业名称';
comment on column SYS_PJOB_MONITOR_LOG.etl_date
is '业务日期';
comment on column SYS_PJOB_MONITOR_LOG.start_date
is '作业开始日期';
comment on column SYS_PJOB_MONITOR_LOG.end_date
is '作业结束日期';
comment on column SYS_PJOB_MONITOR_LOG.pjob_status
is '作业运行状态';
comment on column SYS_PJOB_MONITOR_LOG.pjob_count
is '作业统计条数';
comment on column SYS_PJOB_MONITOR_LOG.date_source
is '源数据系统';
comment on column SYS_PJOB_MONITOR_LOG.pjob_mark
is '层级标示';
具体实现代码:
CREATE OR REPLACE PACKAGE PKG_CLEAN_RULE_TAB AS
----------------------------------------------------------------------------------------------
/*
功能: 实现对特定表数据做清洗,使数据标准化。
版本号:V0.1
<1>. 代码履历
CREATE BY SJM AT 2016-01-13
--------------------------------------------------------------------------------------------
<2>. 参数含义:
P_TAB IN VARCHAR2 ---------需要进行数据清洗的表的表明
-------------------------------------------------------------------------------------------
/*功能:编码转换*/
PROCEDURE SP_CLEAN_CODE_DEFAULT(P_TAB IN VARCHAR2);
/*功能:值域清洗*/
PROCEDURE SP_CLEAN_NUMBER_RANGE(P_TAB IN VARCHAR2);
/*功能: 将字段DATA_DEFAULT LONG类型转换成VARCHAR2类型,用于后续判断字段是否数据默认 **********************************/
PROCEDURE SP_CLEAN_NUMBER_DEFAULT;
/*功能: 空值清洗*/
PROCEDURE SP_CLEAN_CHAR_NULL(P_TAB IN VARCHAR2);
/*功能: 空格清洗*/
PROCEDURE SP_CLEAN_CHAR_BLANK(P_TAB IN VARCHAR2);
/*功能: 非数值清洗*/
PROCEDURE SP_CLEAN_CHAR_RANGE(P_TAB IN VARCHAR2);
/*功能: 长度清洗*/
PROCEDURE SP_CLEAN_CHAR_LENGTH(P_TAB IN VARCHAR2);
/*功能: 日期值清洗*/
PROCEDURE SP_CLEAN_DATA(P_TAB IN VARCHAR2);
/*功能: 强制转换*/
PROCEDURE SP_CLEAN_CODE_CONVERSION(P_TAB IN VARCHAR2);
/*功能: 主键检核*/
PROCEDURE SP_CLEAN_CONSTRAINT(P_TAB IN VARCHAR2);
/* 功能:主程序,调用前面的各个子程序*/
PROCEDURE SP_CLEAN_MAIN(P_TAB IN VARCHAR2 );
PROCEDURE SP_CLEAN_ALL_MAIN;
/*如果字符串是数字格式则返回1,不是则返回0;*/
FUNCTION FN_ISNUMERIC (STR IN VARCHAR2) RETURN NUMBER;
/*判断日期是否合法,返回标记值 若为0则表名日期合法,若日期格式不合法则会返回负值*/
FUNCTION FN_IS_DATE(IN_STR IN VARCHAR2) RETURN NUMBER;
END PKG_CLEAN_RULE_TAB;
/
CREATE OR REPLACE PACKAGE BODY PKG_CLEAN_RULE_TAB AS
/*功能:根据编码代号取编码映射表中标准编码,进行标准化编码转换;同时对于超出范围的记录,假如默认值为空,则不做数据转换,否则需要赋为默认值*/
PROCEDURE SP_CLEAN_CODE_DEFAULT(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_CODE_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, DATA_DEFINITION
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '1'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
EXIT WHEN CHECK_CUR%NOTFOUND;
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-9' || ''' WHERE ' || '(' || V_TAB_COLUMN ||
' NOT IN ' || '(' || V_TAB_CODOMAIN || ')' || ' OR ' ||
V_TAB_COLUMN || ' IS NULL' || ')' || '';
EXECUTE IMMEDIATE V_EXEC_SQL;
--DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END LOOP;
CLOSE CHECK_CUR;
V_END_DATE := SYSDATE;
V_INSERT_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '编码转换',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'编码转换',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_CODE_DEFAULT;
/*功能:对于数值类型进行检查以查看它们是否位于可接受的范围内(例如,个人账户余额应位于0 和2,000,000,000之间等),假如默认值为空,则不做数据转换,否则需要赋为默认值*/
PROCEDURE SP_CLEAN_NUMBER_RANGE(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_MIN_VALUE NUMBER;
V_MAX_VALUE NUMBER;
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, MIN_VALUE, MAX_VALUE
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '4'
GROUP BY CHENCK_COLUMN, MIN_VALUE, MAX_VALUE;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_MIN_VALUE, V_MAX_VALUE;
EXIT WHEN CHECK_CUR%NOTFOUND;
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '999999' || ''' WHERE ' || V_COLUMN_PK ||
' IN ' || '(' || 'SELECT ' || V_COLUMN_PK || ' FROM ' ||
V_CHECK_TAB || ' WHERE ' || V_TAB_COLUMN ||
' NOT BETWEEN ' || V_MIN_VALUE || ' AND ' ||
V_MAX_VALUE || ')' || '';
EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END LOOP;
CLOSE CHECK_CUR;
V_INSERT_NUM := SQL%ROWCOUNT;
V_END_DATE := SYSDATE;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_NUMBER_RANGE;
/*
功能: 将字段DATA_DEFAULT LONG类型转换成VARCHAR2类型,用于后续判断字段是否数据默认
*/
PROCEDURE SP_CLEAN_NUMBER_DEFAULT IS
V_TAB_NAME VARCHAR2(100);
V_COLUMN_NAME VARCHAR2(100);
V_DATA_DEFAULT VARCHAR2(100);
CURSOR CHECK_DEFAULT IS
SELECT TABLE_NAME, COLUMN_NAME, DATA_DEFAULT
FROM USER_TAB_COLS
WHERE DATA_DEFAULT IS NOT NULL;
BEGIN
OPEN CHECK_DEFAULT;
EXECUTE IMMEDIATE 'TRUNCATE TABLE SDM.DATA_DEF_LONG_F';
LOOP
FETCH CHECK_DEFAULT
INTO V_TAB_NAME, V_COLUMN_NAME, V_DATA_DEFAULT;
INSERT INTO DATA_DEF_LONG_F
VALUES
(V_TAB_NAME, V_COLUMN_NAME, V_DATA_DEFAULT);
COMMIT;
EXIT WHEN CHECK_DEFAULT%NOTFOUND;
END LOOP;
CLOSE CHECK_DEFAULT;
END SP_CLEAN_NUMBER_DEFAULT;
/*功能: 判断字段是否为空,假如默认值为空,则不做数据转换,否则需要赋为默认值 */
PROCEDURE SP_CLEAN_CHAR_NULL(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_NULL_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '2'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
EXIT WHEN CHECK_CUR%NOTFOUND;
IF V_CNT > 0 THEN
--判断该字段是否默认为空
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-999999' || ''' WHERE ' || V_TAB_COLUMN || ' IS NULL ' || '';
EXECUTE IMMEDIATE V_EXEC_SQL;
--DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
ELSE
EXIT WHEN CHECK_CUR%NOTFOUND;
--DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
END IF;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END LOOP;
CLOSE CHECK_CUR;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '空值清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'空值清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_CHAR_NULL;
/*功能: 空格清洗*/
PROCEDURE SP_CLEAN_CHAR_BLANK(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50) ;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '3'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
IF V_CNT > 0 THEN
--判断该字段是否默认为空
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-19880707' || ''' WHERE ' ||
V_COLUMN_PK || ' IN ' || '(' || 'SELECT ' ||
V_COLUMN_PK || ' FROM ' || V_CHECK_TAB ||
' WHERE TRIM ' || '(' || V_TAB_COLUMN || ')' ||
'IS NULL' || ' AND ' || V_TAB_COLUMN ||
' IS NOT NULL)' || '';
--EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
ELSE
EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '空格清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'空格清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_CHAR_BLANK;
/*功能: 该字段出现非”0123456789.-+”和空格“ ”情况:假如默认值为空,则不做数据转换,否则需要赋为默认值*/
PROCEDURE SP_CLEAN_CHAR_RANGE(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50) ;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '5'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
IF V_CNT > 0 THEN
--判断该字段是否默认为空
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-19880707' || ''' WHERE ' ||
V_COLUMN_PK || ' IN ' || '(' || 'SELECT ' ||
V_COLUMN_PK || ' FROM ' || V_CHECK_TAB ||
' WHERE PKG_CLEAN_RULE_TAB.ISNUMERIC ' || '(' ||
V_TAB_COLUMN || ')' || ' = 0 ' || ')' || '';
--EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
ELSE
EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '非数值清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'非数值清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_CHAR_RANGE;
/*功能: 长度清洗*/
PROCEDURE SP_CLEAN_CHAR_LENGTH(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_MIN_LEN_VALUE NUMBER;
V_MAX_LEN_VALUE NUMBER;
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
MIN_LEN_VALUE,
MAX_LEN_VALUE
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '6'
GROUP BY CHENCK_COLUMN, MIN_LEN_VALUE, MAX_LEN_VALUE;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_MIN_LEN_VALUE, V_MAX_LEN_VALUE;
EXIT WHEN CHECK_CUR%NOTFOUND;
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '999999' || ''' WHERE ' || V_COLUMN_PK ||
' IN ' || '(' || 'SELECT ' || V_COLUMN_PK || ' FROM ' ||
V_CHECK_TAB || ' WHERE ' || V_TAB_COLUMN ||
' NOT BETWEEN ' || V_MIN_LEN_VALUE || ' AND ' ||
V_MAX_LEN_VALUE || ')' || '';
EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
EXIT WHEN CHECK_CUR%NOTFOUND;
END LOOP;
CLOSE CHECK_CUR;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '长度清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'长度清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_CHAR_LENGTH;
/*功能: 对该字段进行强制转换为默认值,如:密码字段转为"0"*/
PROCEDURE SP_CLEAN_CODE_CONVERSION(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, ENDUE_DEFAULT
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '9'
GROUP BY CHENCK_COLUMN, ENDUE_DEFAULT;
/*功能:获取需要做数据清洗的主键*/
BEGIN
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
IF V_CNT > 0 THEN
--判断该字段是否默认为空
EXIT WHEN CHECK_CUR%NOTFOUND;
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ' || '' || V_TAB_CODOMAIN || '';
EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
ELSE
EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_CODE_CONVERSION;
/*判断该字段是否是合法的日期,假如默认值为空,则不做数据转换,否则需要赋为默认值;判断该字段内容是否合法,需要依据该字段的类型分别进行判断*/
PROCEDURE SP_CLEAN_DATA(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
BEGIN
V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '7'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
EXIT WHEN CHECK_CUR%NOTFOUND;
IF V_CNT > 0 THEN
--判断该字段是否默认为空
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '19000101' || ''' WHERE ' ||
V_COLUMN_PK || ' IN ' || '(' || 'SELECT ' ||
V_COLUMN_PK || ' FROM ' || V_CHECK_TAB ||
' WHERE PKG_CLEAN_RULE_TAB.FN_IS_DATE ' || '(' ||
V_TAB_COLUMN || ')' || ' = 0 ' || ')' || '';
EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
ELSE
EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END;
END SP_CLEAN_DATA;
/*功能: 强制转换*/
/*功能: 主键检核*/
PROCEDURE SP_CLEAN_CONSTRAINT(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
INDEX_PK VARCHAR2(100);
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_SQL_COMMENTS VARCHAR2(4000);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
TYPE I_CURSOR_TYPE IS REF CURSOR;
MY_CURSOR I_CURSOR_TYPE;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
BEGIN
V_CHECK_TAB := P_TAB;
/*功能:获取规则表中检核字段*/
DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, DATA_DEFINITION
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '2'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;
/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;
LOOP
FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
EXIT WHEN CHECK_CUR%NOTFOUND;
DBMS_OUTPUT.PUT_LINE(V_TAB_COLUMN);
V_SQL_COMMENTS := 'SELECT ' || V_COLUMN_PK || ' FROM ' ||
V_CHECK_TAB || ' WHERE ' || V_TAB_COLUMN ||
' NOT IN ' || '(' || V_TAB_CODOMAIN || ')' || '';
/*功能:获取表对应的主键,用于更新目标表*/
IF V_CNT > 0 THEN
--查询该表是否在值域清洗范围内
OPEN MY_CURSOR FOR V_SQL_COMMENTS;
LOOP
FETCH MY_CURSOR
INTO INDEX_PK;
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' ||
V_TAB_COLUMN || ' = ''' || '-787878' ||
''' WHERE ' || V_COLUMN_PK || ' = ' || INDEX_PK;
EXECUTE IMMEDIATE V_EXEC_SQL;
COMMIT;
EXIT WHEN MY_CURSOR%NOTFOUND;
END LOOP;
CLOSE MY_CURSOR;
ELSE
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
DBMS_OUTPUT.PUT_LINE(V_SQL_CODE || ' SQLERRM:' || V_SQL_ERRM);
END IF;
END LOOP;
CLOSE CHECK_CUR;
END;
END SP_CLEAN_CONSTRAINT;
/* 功能:主程序,调用前面的各个子程序*/
PROCEDURE SP_CLEAN_MAIN(P_TAB IN VARCHAR2) --需要进行清洗的表
IS
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_SP_NAME VARCHAR2(50) ;
BEGIN
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_MAIN'||P_TAB;
/*功能:对输入的表按照*/
SELECT COUNT(1)
INTO V_CNT
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB);
IF V_CNT > 0 THEN
FOR I IN (SELECT DISTINCT CLEANING_MARK
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)) LOOP
IF I.CLEANING_MARK = 1 THEN
SP_CLEAN_CODE_DEFAULT(P_TAB);
ELSIF I.CLEANING_MARK = 2 THEN
SP_CLEAN_CHAR_NULL(P_TAB);
ELSIF I.CLEANING_MARK = 3 THEN
SP_CLEAN_CHAR_BLANK(P_TAB);
ELSIF I.CLEANING_MARK = 4 THEN
SP_CLEAN_NUMBER_RANGE(P_TAB);
ELSIF I.CLEANING_MARK = 5 THEN
SP_CLEAN_CHAR_RANGE(P_TAB);
ELSIF I.CLEANING_MARK = 6 THEN
SP_CLEAN_CHAR_LENGTH(P_TAB);
ELSIF I.CLEANING_MARK = 7 THEN
SP_CLEAN_DATA(P_TAB);
ELSIF I.CLEANING_MARK = 9 THEN
SP_CLEAN_CODE_CONVERSION(P_TAB);
END IF;
END LOOP;
ELSE
DBMS_OUTPUT.PUT_LINE('操作步骤结束----对' || P_TAB ||
'已经真正完成了清洗等执行步骤,脚本可参看PART_TAB_LOG表');
END IF;
V_INSERT_NUM := SQL%ROWCOUNT;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END SP_CLEAN_MAIN;
PROCEDURE SP_CLEAN_ALL_MAIN --需要进行清洗的表
IS
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_SP_NAME VARCHAR2(50) := 'SP_CLEAN_ALL_MAIN';
BEGIN
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
/*功能:对输入的表按照*/
SELECT COUNT(1) INTO V_CNT FROM DM_CHECK_RULE_F;
IF V_CNT > 0 THEN
FOR I IN (SELECT DISTINCT CLEANING_MARK, TABLE_NAME
FROM DM_CHECK_RULE_F) LOOP
IF I.CLEANING_MARK = 1 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CODE_DEFAULT(I.TABLE_NAME);
ELSIF I.CLEANING_MARK = 2 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_NULL(I.TABLE_NAME);
ELSIF I.CLEANING_MARK = 3 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_BLANK(I.TABLE_NAME);
ELSIF I.CLEANING_MARK = 4 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_NUMBER_RANGE(I.TABLE_NAME);
ELSIF I.CLEANING_MARK = 5 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_RANGE(I.TABLE_NAME);
ELSIF I.CLEANING_MARK = 6 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_LENGTH(I.TABLE_NAME);
ELSIF I.CLEANING_MARK = 7 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_DATA(I.TABLE_NAME);
ELSIF I.CLEANING_MARK = 9 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CODE_CONVERSION(I.TABLE_NAME);
END IF;
END LOOP;
ELSE
DBMS_OUTPUT.PUT_LINE('操作步骤结束----对已经真正完成了清洗等执行步骤,脚本可参看PART_TAB_LOG表');
END IF;
V_INSERT_NUM := SQL%ROWCOUNT;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;
/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;
/*执行失败记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);
END SP_CLEAN_ALL_MAIN;
/*如果字符串是数字格式则返回1,不是则返回0;*/
FUNCTION FN_ISNUMERIC(STR IN VARCHAR2) RETURN NUMBER IS
V_STR VARCHAR2(1000);
BEGIN
IF STR IS NULL THEN
RETURN 0;
ELSE
V_STR := TRANSLATE(STR, '.0123456789', '.');
IF V_STR = '.' OR V_STR = '+.' OR V_STR = '-.' OR V_STR IS NULL OR
V_STR = ' ' THEN
RETURN 1;
ELSE
RETURN 0;
END IF;
END IF;
END FN_ISNUMERIC;
/*判断日期是否合法,返回标记值 若为0则表名日期合法,若日期格式不合法则会返回负值*/
--判断一个字符串是否是日期格式的函数
FUNCTION FN_IS_DATE(IN_STR VARCHAR2) RETURN NUMBER IS
VAL DATE;
BEGIN
IF IN_STR IS NULL THEN
RETURN 0;
ELSE
VAL := TO_DATE(IN_STR, 'YYYY-MM-DD HH24:MI:SS');
RETURN 1;
END IF;
EXCEPTION
WHEN OTHERS THEN
RETURN 0;
END FN_IS_DATE;
END PKG_CLEAN_RULE_TAB;
/
待完善的地方:
因为每个表的表结构数据,都不一样,目前只能获取各个表的主键信息,在通过和源表关联获取,需要修正的数据。让业务调整后重新加载。
注意:传入的表都必须有主键,因为需要用主键更新。
PKG_ETL_PROC_LOG_LET 这个程序包是用来记录程序失败或者成功的日志,包括更新条数。--查看附件