背景介绍
根据用户需求以及不影响业务生产环境,需每两一小时增量抽取用户数据并做相应的日志记录。
数据源mysql ,所以源控件选择是ODBC
先上个完成后的截图
Control Flow Task:
Data Flow Task :
最大执行时间不为空,增量抽取时
最大执行时间为空,全量抽取时
variables:
实现步骤
1.建立抽取前后行数简单日志记录表(此处的日志记录主要记录是执行前后数据行数变化,错误日志记录此处就不介绍了,在事件处理程序中设计错误日志记录)
CREATE TABLE [dbo].[ETL_Program_Audit](
[ID] [int] IDENTITY(1,1) NOT NULL,
[ETLName] [nvarchar](100) NULL, --ETL包名
[ETLTable] [nvarchar](100) NULL, --ETL表名
[ETLDate] [datetime] NULL, --ETL日期
[StartTime] [datetime] NULL, --ETL开始时间
[EndTime] [datetime] NULL, --ETL结束时间
[ETLRowsStart] [bigint] NULL, --ETL前的行数
[ETLRowsEnd] [bigint] NULL, --ETL后的行数
[ETLStatus] [nvarchar](10) NULL --状态:开始结束
) ON [PRIMARY]
2.拖个Excute SQL Task 控件名为EST_Get_RowCount_Start,右击编辑,连接好数据源,SQLSourceType选择变量,新建三个String类型变量。
A. v_TargetTable(存目标表名BatchRefresh_Bizuser):
B. v_ETLRunTime(获取ETL执行时间):
(DT_WSTR, 20) (DT_DATE) getdate()
C. v_SQL_ETLAudit_Start(存放ETL审计脚本):
"begin declare @etl_rows int;
select @etl_rows=count(1) from dbo." + @[User::v_TargetTable] + ";
insert into dbo.ETL_Program_Audit(ETLName,ETLTable,ETLDate,ETLStatus,StartTime,ETLRowsStart)
values('" + @[System::PackageName] + "','" + @[User::v_TargetTable] + "','"+ (DT_WSTR, 20) @[User::v_ETLRunTime] +"','开始',getdate(),@etl_rows); end;"
生成的sql如下所示:获取目标表执行前的行数等信息
begin declare @etl_rows int;
select @etl_rows=count(1) from dbo.BatchRefresh_Bizuser;
insert into dbo.ETL_Program_Audit(ETLName,ETLTable,ETLDate,ETLStatus,StartTime,ETLRowsStart)
values('EJPBI_BatchRefresh_Bizuser_IL_old','BatchRefresh_Bizuser','2017/11/27 20:37:13','开始',getdate(),@etl_rows); end;
3.拖个Excute SQL Task 控件名为EST_Get_MaxETLTime,新建两个String 变量。
A. v_MaxETLTime:存放获取的最大ETL时间,用来判断是否初次执行以及抽取时间范围的界定。
B. v_cmd_MaxETLTime(Sql脚本):获取的最大ETL时间
"select max(endtime) from [ETL_Program_Audit] where etltable='"+ @[User::v_TargetTable] +"' "
select max(endtime) from [ETL_Program_Audit] where etltable='BatchRefresh_Bizuser'
4.拖个Data Flow Task名为DFT_Load_BatchRefresh_Bizuser,新建三个String变量
v_DayBeginTime(增量抽取的开始时间):
(DT_WSTR, 10)YEAR((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)MONTH((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)DAY((DT_DATE) @[User::v_MaxETLTime])+ " "+(DT_WSTR, 10) DATEPART("hh", DATEADD("hh", -2, (DT_DATE) @[User::v_MaxETLTime]))+":50"
v_DayEndTime (增量抽取的结束时间):
(DT_WSTR, 10)YEAR((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)MONTH((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)DAY((DT_DATE) @[User::v_MaxETLTime])+ " "+(DT_WSTR, 10) DATEPART("hh", (DT_DATE) @[User::v_MaxETLTime])+":00"
v_SQL_Cmd_Bizuser表达式:
"SELECT bu.id,bu.name,bu.state,bu.audittime,bu.city_id,bu.lastupdatetime,bu.salesuser_id
FROM bizuser bu
where bu.audittime between '"+@[User::v_DayBeginTime]+"' and '"+ @[User::v_DayEndTime]+"'"
SELECT bu.id,bu.name,bu.state,bu.audittime,bu.city_id,bu.lastupdatetime,bu.salesuser_id
FROM bizuser bu
where bu.audittime between '2017-11-27 18:50' and '2017-11-27 20:00'
这里为什么是18:50而不是19:00,考虑中途抽取的时间可能过长,在每小时抽取的时候会漏抽数据。所以多抽十分钟的数据容错。根据实际情况考虑时间。
4.1 右击DFT_Load_BatchRefresh_Bizuser,属性如图所示,配置Expressions
Property绑定SqlCommand。 Expression赋值变量v_SQL_Cmd_Bizuser
设置完成DataFlowTask控件就会出现表达式图标:
作用:在数据流中的ODBC数据源中就会自动生成sql脚本,如图所示:
4.2 优先约束表达式的配置,如图所示:
4.2.1 最大执行时间不为空时,增量抽取。
右击流程线编辑,进入优先约束编辑,配置如下图所示:
表达式:FINDSTRING((DT_WSTR, 20) @[User::v_MaxETLTime], "1899", 1) ==0?1==1:1==2
(说明:第一次执行,变量User::v_MaxETLTime值不存在,但是SSIS中默认是1899-01-01,没想到别的方法,所以此处我这样处理了下)
当User::v_MaxETLTime不为空时,时间肯定也是大于1899的,没找到匹配的1899,所以Findstring函数返回0
,三目运算返回1==1,为true,继续往下执行。
Data Flow中,利用Lookup处理抽重的数据,无匹配则插入,匹配则更新,如图所示:
关于Lookup和OLEDB Command控件这里也不做详细说明了。
4.2.2 最大执行时间为空时,全量抽取。
优先约束设置同最大执行时间不为空的设置和表达式一样,也就不做说明了。
新建String变量v_cmd_bizuser_all
"SELECT bu.id,bu.name,bu.state,bu.audittime,bu.city_id,bu.lastupdatetime,bu.salesuser_id
FROM bizuser bu"
DataFlow中,ODBC源和OLEDB目标,如图所示
5.拖ExcuteSqlTask名为EST_Get_RowCount_Finish。新建String变量
v_SQL_ETLAudit_End(记录执行完后表的行数):
"begin declare @etl_rows int; declare @MaxAuditID int;
select @etl_rows=count(1) from dbo." + @[User::v_TargetTable] + "; select @MaxAuditID = Max(ID) from dbo.ETL_Program_Audit Where ETLName = '" + @[System::PackageName] +"' and ETLTable= '" + @[User::v_TargetTable] +"';
Update dbo.ETL_Program_Audit SET ETLRowsEnd = @etl_rows, EndTime = getdate(), ETLStatus = '结束'
Where ETLName = '"+ @[System::PackageName] +"' and ETLTable= '"+ @[User::v_TargetTable]+ "' and ID = @MaxAuditID; end;"
begin declare @etl_rows int; declare @MaxAuditID int;
select @etl_rows=count(1) from dbo.BatchRefresh_Bizuser;
select @MaxAuditID = Max(ID) from dbo.ETL_Program_Audit Where ETLName = 'EJPBI_BatchRefresh_Bizuser_IL_old' and ETLTable= 'BatchRefresh_Bizuser';
Update dbo.ETL_Program_Audit SET ETLRowsEnd = @etl_rows, EndTime = getdate(), ETLStatus = '结束'
Where ETLName = 'EJPBI_BatchRefresh_Bizuser_IL_old' and ETLTable= 'BatchRefresh_Bizuser' and ID = @MaxAuditID; end;
6.优先约束设置,如图所示
其中有一条为true就可通过,继续执行,此处逻辑不可能存在都为true。
7.之前执行过,ETL_Program_Audit表MAXETLTime有值,执行结果,如图所示:
执行后日志表
总结:
案例用了太多的变量,到最后可能自己都晕了,根据实际情况有些不常变化的变量可以省去或者用别的方法替代变量。
增量抽取要考虑实际情况:包运行时的时间如果没有考虑到设计中去,增量抽取的时间戳范围没有界定好,可能会漏抽数据。
案例中简单的步骤省去了,主要抛出一种设计思想,供参考。