ODS系统基于存储过程实现数据清洗实例

浏览: 4779

场景:

由于公司目前没有特定的产品去实现关于数据清洗,目前只能通过对应的存储过程方式来做处理。

整个ETL实现流程图如下:

Clipboard Image.png

业务提供清洗规则:

Clipboard Image.png


具体实现方案:

按照业务规则,需要对数据缺失值做清洗,对数据范围值域做清洗,非数值清洗,非日期值做清洗,包括空格清洗做相应处理。对需要做数据清洗的表字段和特定业务校验规则,做分类标记。


a)生成配置表。表结构如下:

Clipboard Image.png

b)创建表 DATA_DEF_LONG_F 用来承载字段默认内容。 

c)表SYS_SP_MONITOR_LOG 是用来记录每个程序运行日志。


创建表脚本如下:

 --功能: 将字段DATA_DEFAULT LONG类型转换成VARCHAR2类型,用于后续判断字段是否数据默认
create table DATA_DEF_LONG_F
(
tab_name VARCHAR2(100),
tab_column VARCHAR2(100),
tab_long VARCHAR2(100)
)
tablespace TEMP;
数据清洗规则配置表
-- Create table
create table DM_CHECK_RULE_F
(
table_name VARCHAR2(100),
chenck_column VARCHAR2(100),
data_definition VARCHAR2(100),
endue_default VARCHAR2(100),
min_value NUMBER,
max_value NUMBER,
min_len_value NUMBER,
max_len_value NUMBER,
cleaning_mark NUMBER,
descriptor VARCHAR2(1000),
basic_path VARCHAR2(1000),
column_meaning VARCHAR2(1000)
)
tablespace TEMP;
-- Add comments to the table
comment on table DM_CHECK_RULE_F
is '数据清洗规则配置表';
-- Add comments to the columns
comment on column DM_CHECK_RULE_F.table_name
is '需要清洗数据表名称';
comment on column DM_CHECK_RULE_F.chenck_column
is '需要清洗数据表字段';
comment on column DM_CHECK_RULE_F.data_definition
is '业务定义数据值域';
comment on column DM_CHECK_RULE_F.endue_default
is '强制转换特定赋值';
comment on column DM_CHECK_RULE_F.min_value
is '数值值域最小值';
comment on column DM_CHECK_RULE_F.max_value
is '数值值域最大值';
comment on column DM_CHECK_RULE_F.min_len_value
is '字段长度值域最小值';
comment on column DM_CHECK_RULE_F.max_len_value
is '字段长度值域最大值';
comment on column DM_CHECK_RULE_F.cleaning_mark
is '需要清洗数据类型(1:编码转化 2:非空清洗 3:空格清洗 4:值域清洗 5:数值清洗 6:长度清洗 7:日期值清洗 9:强制转换)';
comment on column DM_CHECK_RULE_F.descriptor
is '清洗字段具体实现数值';
comment on column DM_CHECK_RULE_F.basic_path
is '要素路径';
comment on column DM_CHECK_RULE_F.column_meaning
is '字段的含义';

-- Create table
create table SYS_SP_MONITOR_LOG
(
sp_proc_name VARCHAR2(100),
start_date DATE,
end_date DATE,
etl_date DATE,
sp_status VARCHAR2(50),
sp_insert_num NUMBER,
sp_update_num NUMBER,
sp_subject VARCHAR2(100),
sp_sub_subject VARCHAR2(200),
sp_error_info VARCHAR2(4000)
)
tablespace TEMP;

-- Create table
create table SYS_PJOB_MONITOR_LOG
(
pjob_name VARCHAR2(500),
etl_date VARCHAR2(8),
start_date DATE,
end_date DATE,
pjob_status VARCHAR2(10),
pjob_count NUMBER,
date_source VARCHAR2(80),
pjob_mark VARCHAR2(10)
)
tablespace TEMP;
-- Add comments to the columns
comment on column SYS_PJOB_MONITOR_LOG.pjob_name
is '作业名称';
comment on column SYS_PJOB_MONITOR_LOG.etl_date
is '业务日期';
comment on column SYS_PJOB_MONITOR_LOG.start_date
is '作业开始日期';
comment on column SYS_PJOB_MONITOR_LOG.end_date
is '作业结束日期';
comment on column SYS_PJOB_MONITOR_LOG.pjob_status
is '作业运行状态';
comment on column SYS_PJOB_MONITOR_LOG.pjob_count
is '作业统计条数';
comment on column SYS_PJOB_MONITOR_LOG.date_source
is '源数据系统';
comment on column SYS_PJOB_MONITOR_LOG.pjob_mark
is '层级标示';

具体实现代码:

Clipboard Image.png

CREATE OR REPLACE PACKAGE PKG_CLEAN_RULE_TAB AS

----------------------------------------------------------------------------------------------
/*
功能: 实现对特定表数据做清洗,使数据标准化。
版本号:V0.1
<1>. 代码履历
CREATE BY SJM AT 2016-01-13

--------------------------------------------------------------------------------------------
<2>. 参数含义:
P_TAB IN VARCHAR2 ---------需要进行数据清洗的表的表明
-------------------------------------------------------------------------------------------
/*功能:编码转换*/

PROCEDURE SP_CLEAN_CODE_DEFAULT(P_TAB IN VARCHAR2);

/*功能:值域清洗*/

PROCEDURE SP_CLEAN_NUMBER_RANGE(P_TAB IN VARCHAR2);

/*功能: 将字段DATA_DEFAULT LONG类型转换成VARCHAR2类型,用于后续判断字段是否数据默认 **********************************/

PROCEDURE SP_CLEAN_NUMBER_DEFAULT;

/*功能: 空值清洗*/
PROCEDURE SP_CLEAN_CHAR_NULL(P_TAB IN VARCHAR2);

/*功能: 空格清洗*/

PROCEDURE SP_CLEAN_CHAR_BLANK(P_TAB IN VARCHAR2);


/*功能: 非数值清洗*/

PROCEDURE SP_CLEAN_CHAR_RANGE(P_TAB IN VARCHAR2);

/*功能: 长度清洗*/

PROCEDURE SP_CLEAN_CHAR_LENGTH(P_TAB IN VARCHAR2);



/*功能: 日期值清洗*/
PROCEDURE SP_CLEAN_DATA(P_TAB IN VARCHAR2);

/*功能: 强制转换*/

PROCEDURE SP_CLEAN_CODE_CONVERSION(P_TAB IN VARCHAR2);


/*功能: 主键检核*/

PROCEDURE SP_CLEAN_CONSTRAINT(P_TAB IN VARCHAR2);

/* 功能:主程序,调用前面的各个子程序*/
PROCEDURE SP_CLEAN_MAIN(P_TAB IN VARCHAR2 );

PROCEDURE SP_CLEAN_ALL_MAIN;


/*如果字符串是数字格式则返回1,不是则返回0;*/
FUNCTION FN_ISNUMERIC (STR IN VARCHAR2) RETURN NUMBER;

/*判断日期是否合法,返回标记值 若为0则表名日期合法,若日期格式不合法则会返回负值*/
FUNCTION FN_IS_DATE(IN_STR IN VARCHAR2) RETURN NUMBER;



END PKG_CLEAN_RULE_TAB;
/
CREATE OR REPLACE PACKAGE BODY PKG_CLEAN_RULE_TAB AS

/*功能:根据编码代号取编码映射表中标准编码,进行标准化编码转换;同时对于超出范围的记录,假如默认值为空,则不做数据转换,否则需要赋为默认值*/
PROCEDURE SP_CLEAN_CODE_DEFAULT(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_CODE_'||P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, DATA_DEFINITION
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '1'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
EXIT WHEN CHECK_CUR%NOTFOUND;

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-9' || ''' WHERE ' || '(' || V_TAB_COLUMN ||
' NOT IN ' || '(' || V_TAB_CODOMAIN || ')' || ' OR ' ||
V_TAB_COLUMN || ' IS NULL' || ')' || '';

EXECUTE IMMEDIATE V_EXEC_SQL;

--DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;

EXIT WHEN CHECK_CUR%NOTFOUND;

V_UPDATE_NUM := V_UPDATE_NUM + 1;

END LOOP;
CLOSE CHECK_CUR;

V_END_DATE := SYSDATE;
V_INSERT_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '编码转换',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'编码转换',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_CODE_DEFAULT;

/*功能:对于数值类型进行检查以查看它们是否位于可接受的范围内(例如,个人账户余额应位于0 和2,000,000,000之间等),假如默认值为空,则不做数据转换,否则需要赋为默认值*/
PROCEDURE SP_CLEAN_NUMBER_RANGE(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_MIN_VALUE NUMBER;
V_MAX_VALUE NUMBER;

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;
/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, MIN_VALUE, MAX_VALUE
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '4'
GROUP BY CHENCK_COLUMN, MIN_VALUE, MAX_VALUE;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_MIN_VALUE, V_MAX_VALUE;
EXIT WHEN CHECK_CUR%NOTFOUND;

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '999999' || ''' WHERE ' || V_COLUMN_PK ||
' IN ' || '(' || 'SELECT ' || V_COLUMN_PK || ' FROM ' ||
V_CHECK_TAB || ' WHERE ' || V_TAB_COLUMN ||
' NOT BETWEEN ' || V_MIN_VALUE || ' AND ' ||
V_MAX_VALUE || ')' || '';

EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;

EXIT WHEN CHECK_CUR%NOTFOUND;

V_UPDATE_NUM := V_UPDATE_NUM + 1;
END LOOP;
CLOSE CHECK_CUR;

V_INSERT_NUM := SQL%ROWCOUNT;
V_END_DATE := SYSDATE;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_NUMBER_RANGE;
/*
功能: 将字段DATA_DEFAULT LONG类型转换成VARCHAR2类型,用于后续判断字段是否数据默认
*/

PROCEDURE SP_CLEAN_NUMBER_DEFAULT IS

V_TAB_NAME VARCHAR2(100);
V_COLUMN_NAME VARCHAR2(100);
V_DATA_DEFAULT VARCHAR2(100);

CURSOR CHECK_DEFAULT IS
SELECT TABLE_NAME, COLUMN_NAME, DATA_DEFAULT
FROM USER_TAB_COLS
WHERE DATA_DEFAULT IS NOT NULL;
BEGIN

OPEN CHECK_DEFAULT;

EXECUTE IMMEDIATE 'TRUNCATE TABLE SDM.DATA_DEF_LONG_F';

LOOP

FETCH CHECK_DEFAULT
INTO V_TAB_NAME, V_COLUMN_NAME, V_DATA_DEFAULT;

INSERT INTO DATA_DEF_LONG_F
VALUES
(V_TAB_NAME, V_COLUMN_NAME, V_DATA_DEFAULT);
COMMIT;

EXIT WHEN CHECK_DEFAULT%NOTFOUND;

END LOOP;
CLOSE CHECK_DEFAULT;
END SP_CLEAN_NUMBER_DEFAULT;

/*功能: 判断字段是否为空,假如默认值为空,则不做数据转换,否则需要赋为默认值 */
PROCEDURE SP_CLEAN_CHAR_NULL(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_NULL_'||P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '2'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;

EXIT WHEN CHECK_CUR%NOTFOUND;

IF V_CNT > 0 THEN
--判断该字段是否默认为空

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-999999' || ''' WHERE ' || V_TAB_COLUMN || ' IS NULL ' || '';

EXECUTE IMMEDIATE V_EXEC_SQL;
--DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;

ELSE

EXIT WHEN CHECK_CUR%NOTFOUND;
--DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);


END IF;
V_UPDATE_NUM := V_UPDATE_NUM + 1;

END LOOP;
CLOSE CHECK_CUR;

V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '空值清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'空值清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_CHAR_NULL;

/*功能: 空格清洗*/

PROCEDURE SP_CLEAN_CHAR_BLANK(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50) ;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '3'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;

IF V_CNT > 0 THEN
--判断该字段是否默认为空

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-19880707' || ''' WHERE ' ||
V_COLUMN_PK || ' IN ' || '(' || 'SELECT ' ||
V_COLUMN_PK || ' FROM ' || V_CHECK_TAB ||
' WHERE TRIM ' || '(' || V_TAB_COLUMN || ')' ||
'IS NULL' || ' AND ' || V_TAB_COLUMN ||
' IS NOT NULL)' || '';

--EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;

ELSE

EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;

V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '空格清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'空格清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_CHAR_BLANK;

/*功能: 该字段出现非”0123456789.-+”和空格“ ”情况:假如默认值为空,则不做数据转换,否则需要赋为默认值*/

PROCEDURE SP_CLEAN_CHAR_RANGE(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50) ;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '5'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;

IF V_CNT > 0 THEN
--判断该字段是否默认为空

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '-19880707' || ''' WHERE ' ||
V_COLUMN_PK || ' IN ' || '(' || 'SELECT ' ||
V_COLUMN_PK || ' FROM ' || V_CHECK_TAB ||
' WHERE PKG_CLEAN_RULE_TAB.ISNUMERIC ' || '(' ||
V_TAB_COLUMN || ')' || ' = 0 ' || ')' || '';
--EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;

ELSE

EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;

V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '非数值清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'非数值清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_CHAR_RANGE;

/*功能: 长度清洗*/
PROCEDURE SP_CLEAN_CHAR_LENGTH(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_MIN_LEN_VALUE NUMBER;
V_MAX_LEN_VALUE NUMBER;

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
MIN_LEN_VALUE,
MAX_LEN_VALUE
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '6'
GROUP BY CHENCK_COLUMN, MIN_LEN_VALUE, MAX_LEN_VALUE;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_MIN_LEN_VALUE, V_MAX_LEN_VALUE;
EXIT WHEN CHECK_CUR%NOTFOUND;

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '999999' || ''' WHERE ' || V_COLUMN_PK ||
' IN ' || '(' || 'SELECT ' || V_COLUMN_PK || ' FROM ' ||
V_CHECK_TAB || ' WHERE ' || V_TAB_COLUMN ||
' NOT BETWEEN ' || V_MIN_LEN_VALUE || ' AND ' ||
V_MAX_LEN_VALUE || ')' || '';

EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
EXIT WHEN CHECK_CUR%NOTFOUND;

END LOOP;
CLOSE CHECK_CUR;

V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '长度清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'长度清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_CHAR_LENGTH;

/*功能: 对该字段进行强制转换为默认值,如:密码字段转为"0"*/

PROCEDURE SP_CLEAN_CODE_CONVERSION(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, ENDUE_DEFAULT
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '9'
GROUP BY CHENCK_COLUMN, ENDUE_DEFAULT;

/*功能:获取需要做数据清洗的主键*/
BEGIN

OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;

IF V_CNT > 0 THEN
--判断该字段是否默认为空
EXIT WHEN CHECK_CUR%NOTFOUND;

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ' || '' || V_TAB_CODOMAIN || '';

EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);

COMMIT;

ELSE

EXIT WHEN CHECK_CUR%NOTFOUND;

V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;

V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_CODE_CONVERSION;

/*判断该字段是否是合法的日期,假如默认值为空,则不做数据转换,否则需要赋为默认值;判断该字段内容是否合法,需要依据该字段的类型分别进行判断*/
PROCEDURE SP_CLEAN_DATA(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SP_NAME VARCHAR2(50);
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);

BEGIN

V_CHECK_TAB := P_TAB;
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_'||P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT SUM(CASE
WHEN F.TAB_COLUMN IS NULL THEN
1
ELSE
0
END) ROW_COUNT,
CHENCK_COLUMN,
DATA_DEFINITION
FROM DM_CHECK_RULE_F E, DATA_DEF_LONG_F F
WHERE E.TABLE_NAME = F.TAB_NAME(+)
AND E.CHENCK_COLUMN = F.TAB_COLUMN(+)
AND F.TAB_LONG(+) = ''''''
AND E.TABLE_NAME = UPPER(P_TAB)
AND E.CLEANING_MARK = '7'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;
EXIT WHEN CHECK_CUR%NOTFOUND;
IF V_CNT > 0 THEN
--判断该字段是否默认为空

V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' || V_TAB_COLUMN ||
' = ''' || '19000101' || ''' WHERE ' ||
V_COLUMN_PK || ' IN ' || '(' || 'SELECT ' ||
V_COLUMN_PK || ' FROM ' || V_CHECK_TAB ||
' WHERE PKG_CLEAN_RULE_TAB.FN_IS_DATE ' || '(' ||
V_TAB_COLUMN || ')' || ' = 0 ' || ')' || '';

EXECUTE IMMEDIATE V_EXEC_SQL;
DBMS_OUTPUT.PUT_LINE(V_EXEC_SQL);
COMMIT;

ELSE

EXIT WHEN CHECK_CUR%NOTFOUND;
V_UPDATE_NUM := V_UPDATE_NUM + 1;
END IF;
END LOOP;
CLOSE CHECK_CUR;

V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END;

END SP_CLEAN_DATA;
/*功能: 强制转换*/

/*功能: 主键检核*/

PROCEDURE SP_CLEAN_CONSTRAINT(P_TAB IN VARCHAR2) --需要对数据做清洗的表名
IS
INDEX_PK VARCHAR2(100);
V_CHECK_TAB VARCHAR2(100);
V_TAB_COLUMN VARCHAR2(100);
V_TAB_CODOMAIN VARCHAR2(100);
V_SQL_COMMENTS VARCHAR2(4000);
V_EXEC_SQL VARCHAR2(4000);
V_COLUMN_PK VARCHAR2(100);
V_CNT NUMBER;
TYPE I_CURSOR_TYPE IS REF CURSOR;
MY_CURSOR I_CURSOR_TYPE;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);

BEGIN

V_CHECK_TAB := P_TAB;

/*功能:获取规则表中检核字段*/

DECLARE
CURSOR CHECK_CUR IS
SELECT COUNT(*), CHENCK_COLUMN, DATA_DEFINITION
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)
AND CLEANING_MARK = '2'
GROUP BY CHENCK_COLUMN, DATA_DEFINITION;

/*功能:获取需要做数据清洗的主键*/
BEGIN
SELECT COLUMN_NAME
INTO V_COLUMN_PK
FROM USER_IND_COLUMNS
WHERE INDEX_NAME IN (SELECT INDEX_NAME
FROM SYS.USER_CONSTRAINTS T
WHERE TABLE_NAME = UPPER(P_TAB)
AND CONSTRAINT_TYPE = 'P');
OPEN CHECK_CUR;

LOOP

FETCH CHECK_CUR
INTO V_CNT, V_TAB_COLUMN, V_TAB_CODOMAIN;

EXIT WHEN CHECK_CUR%NOTFOUND;

DBMS_OUTPUT.PUT_LINE(V_TAB_COLUMN);

V_SQL_COMMENTS := 'SELECT ' || V_COLUMN_PK || ' FROM ' ||
V_CHECK_TAB || ' WHERE ' || V_TAB_COLUMN ||
' NOT IN ' || '(' || V_TAB_CODOMAIN || ')' || '';

/*功能:获取表对应的主键,用于更新目标表*/
IF V_CNT > 0 THEN
--查询该表是否在值域清洗范围内

OPEN MY_CURSOR FOR V_SQL_COMMENTS;

LOOP

FETCH MY_CURSOR
INTO INDEX_PK;
V_EXEC_SQL := 'UPDATE ' || V_CHECK_TAB || ' SET ' ||
V_TAB_COLUMN || ' = ''' || '-787878' ||
''' WHERE ' || V_COLUMN_PK || ' = ' || INDEX_PK;

EXECUTE IMMEDIATE V_EXEC_SQL;
COMMIT;

EXIT WHEN MY_CURSOR%NOTFOUND;

END LOOP;

CLOSE MY_CURSOR;

ELSE
V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);

DBMS_OUTPUT.PUT_LINE(V_SQL_CODE || ' SQLERRM:' || V_SQL_ERRM);
END IF;

END LOOP;
CLOSE CHECK_CUR;
END;

END SP_CLEAN_CONSTRAINT;

/* 功能:主程序,调用前面的各个子程序*/

PROCEDURE SP_CLEAN_MAIN(P_TAB IN VARCHAR2) --需要进行清洗的表
IS
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_SP_NAME VARCHAR2(50) ;

BEGIN
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;
V_SP_NAME := 'SP_MAIN'||P_TAB;

/*功能:对输入的表按照*/

SELECT COUNT(1)
INTO V_CNT
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB);

IF V_CNT > 0 THEN
FOR I IN (SELECT DISTINCT CLEANING_MARK
FROM DM_CHECK_RULE_F
WHERE TABLE_NAME = UPPER(P_TAB)) LOOP


IF I.CLEANING_MARK = 1 THEN

SP_CLEAN_CODE_DEFAULT(P_TAB);

ELSIF I.CLEANING_MARK = 2 THEN

SP_CLEAN_CHAR_NULL(P_TAB);

ELSIF I.CLEANING_MARK = 3 THEN

SP_CLEAN_CHAR_BLANK(P_TAB);

ELSIF I.CLEANING_MARK = 4 THEN

SP_CLEAN_NUMBER_RANGE(P_TAB);

ELSIF I.CLEANING_MARK = 5 THEN

SP_CLEAN_CHAR_RANGE(P_TAB);

ELSIF I.CLEANING_MARK = 6 THEN
SP_CLEAN_CHAR_LENGTH(P_TAB);

ELSIF I.CLEANING_MARK = 7 THEN
SP_CLEAN_DATA(P_TAB);

ELSIF I.CLEANING_MARK = 9 THEN
SP_CLEAN_CODE_CONVERSION(P_TAB);

END IF;
END LOOP;

ELSE
DBMS_OUTPUT.PUT_LINE('操作步骤结束----对' || P_TAB ||
'已经真正完成了清洗等执行步骤,脚本可参看PART_TAB_LOG表');

END IF;

V_INSERT_NUM := SQL%ROWCOUNT;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END SP_CLEAN_MAIN;

PROCEDURE SP_CLEAN_ALL_MAIN --需要进行清洗的表
IS
V_CNT NUMBER;
V_START_DATE TIMESTAMP; --开始时间
V_END_DATE TIMESTAMP; --结束时间
V_INSERT_NUM NUMBER;
V_UPDATE_NUM NUMBER;
V_SQL_CODE VARCHAR2(50);
V_SQL_ERRM VARCHAR2(4000);
V_SP_NAME VARCHAR2(50) := 'SP_CLEAN_ALL_MAIN';

BEGIN
V_INSERT_NUM := 0;
V_UPDATE_NUM := 0;
V_START_DATE := SYSDATE;

/*功能:对输入的表按照*/

SELECT COUNT(1) INTO V_CNT FROM DM_CHECK_RULE_F;

IF V_CNT > 0 THEN
FOR I IN (SELECT DISTINCT CLEANING_MARK, TABLE_NAME
FROM DM_CHECK_RULE_F) LOOP

IF I.CLEANING_MARK = 1 THEN

PKG_CLEAN_RULE_TAB.SP_CLEAN_CODE_DEFAULT(I.TABLE_NAME);

ELSIF I.CLEANING_MARK = 2 THEN

PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_NULL(I.TABLE_NAME);

ELSIF I.CLEANING_MARK = 3 THEN

PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_BLANK(I.TABLE_NAME);

ELSIF I.CLEANING_MARK = 4 THEN

PKG_CLEAN_RULE_TAB.SP_CLEAN_NUMBER_RANGE(I.TABLE_NAME);

ELSIF I.CLEANING_MARK = 5 THEN

PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_RANGE(I.TABLE_NAME);

ELSIF I.CLEANING_MARK = 6 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CHAR_LENGTH(I.TABLE_NAME);

ELSIF I.CLEANING_MARK = 7 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_DATA(I.TABLE_NAME);

ELSIF I.CLEANING_MARK = 9 THEN
PKG_CLEAN_RULE_TAB.SP_CLEAN_CODE_CONVERSION(I.TABLE_NAME);

END IF;
END LOOP;

ELSE
DBMS_OUTPUT.PUT_LINE('操作步骤结束----对已经真正完成了清洗等执行步骤,脚本可参看PART_TAB_LOG表');

END IF;

V_INSERT_NUM := SQL%ROWCOUNT;
V_END_DATE := SYSDATE;
V_UPDATE_NUM := SQL%ROWCOUNT;

/*执行成功记录日志 */
MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_RECORD(V_PROC_NAME => V_SP_NAME,
V_START_DATE => V_START_DATE,
V_END_DTTM => V_END_DATE,
V_ETL_DATE => '',
V_INSERT_NUM => V_INSERT_NUM,
V_UPDATE_NUM => V_UPDATE_NUM,
V_SUBJECT => '数据清洗',
V_SUB_SUBJECT => '值域清洗',
V_ERROR_INFO => '');

EXCEPTION
WHEN OTHERS THEN
ROLLBACK;

V_SQL_CODE := SQLCODE;
V_SQL_ERRM := SUBSTR(SQLERRM, 1, 40000);
V_END_DATE := SYSDATE;

/*执行失败记录日志 */

MNG.PKG_ETL_PROC_LOG_LET.SP_ETL_PROC_LOG_ABNORMITY(V_SP_NAME,
V_START_DATE,
V_END_DATE,
'',
V_INSERT_NUM,
V_UPDATE_NUM,
'数据清洗',
'值域清洗',
'ERROR CODE ' ||
V_SQL_CODE || ': ' ||
V_SQL_ERRM);

END SP_CLEAN_ALL_MAIN;
/*如果字符串是数字格式则返回1,不是则返回0;*/
FUNCTION FN_ISNUMERIC(STR IN VARCHAR2) RETURN NUMBER IS
V_STR VARCHAR2(1000);
BEGIN
IF STR IS NULL THEN
RETURN 0;
ELSE
V_STR := TRANSLATE(STR, '.0123456789', '.');

IF V_STR = '.' OR V_STR = '+.' OR V_STR = '-.' OR V_STR IS NULL OR
V_STR = ' ' THEN
RETURN 1;
ELSE
RETURN 0;
END IF;
END IF;
END FN_ISNUMERIC;

/*判断日期是否合法,返回标记值 若为0则表名日期合法,若日期格式不合法则会返回负值*/
--判断一个字符串是否是日期格式的函数

FUNCTION FN_IS_DATE(IN_STR VARCHAR2) RETURN NUMBER IS
VAL DATE;
BEGIN
IF IN_STR IS NULL THEN
RETURN 0;
ELSE

VAL := TO_DATE(IN_STR, 'YYYY-MM-DD HH24:MI:SS');
RETURN 1;
END IF;
EXCEPTION
WHEN OTHERS THEN
RETURN 0;
END FN_IS_DATE;

END PKG_CLEAN_RULE_TAB;
/

待完善的地方:

因为每个表的表结构数据,都不一样,目前只能获取各个表的主键信息,在通过和源表关联获取,需要修正的数据。让业务调整后重新加载。

注意:传入的表都必须有主键,因为需要用主键更新

PKG_ETL_PROC_LOG_LET 这个程序包是用来记录程序失败或者成功的日志,包括更新条数。--查看附件

推荐 6
本文由 我是最优雅的杀手,不杀人专杀狗 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

12 个评论

好东西!可以作为通用的模板来使用了。
恩,会持续完善。
很实用啊
恩,可以支持批量,缺陷就是记录错误数据,因为传入表的表结构都各有不同,只能记录各个表的主键,然后在关联表数据,发给业务做调整。另外传入的表必须有主键,否则出错。
能否把你的两张表:DATA_DEF_LONG_F,DM_CHECK_RULE_F相关脚本发给我,存储过程中涉及到这两张表 并没有交代清楚,谢谢!
表结构,我已经更新到文章中了。
DATA_DEF_LONG_F --功能: 将字段DATA_DEFAULT LONG类型转换成VARCHAR2类型,用于后续判断字段是否数据默认 。
DM_CHECK_RULE_F --主要承载对要清洗的表名和规则
必须得给100个赞,可惜就是没有表情符号,得给勇哥提提建议啦,哈哈
恩,有问题可以及时交流。稍后把关于存储记录日志程序发一下。
PKG_ETL_PROC_LOG_LET 包缺少 两个表:SYS_PJOB_MONITOR_LOG
SYS_SP_MONITOR_LOG ,谢谢!
好的。稍等。
谢谢,这样就很完整了,完全可以作为项目清洗模板了,十分感谢大神的分享!

要回复文章请先登录注册