SSIS2012之每小时增量全量抽取注册用户数据日志记录案例

浏览: 2237

背景介绍

 根据用户需求以及不影响业务生产环境,需每两一小时增量抽取用户数据并做相应的日志记录。

 数据源mysql ,所以源控件选择是ODBC

先上个完成后的截图

Control Flow Task:

Clipboard Image.png


Data Flow Task :

 最大执行时间不为空,增量抽取时

Clipboard Image.png

最大执行时间为空,全量抽取时

Clipboard Image.png

variables:

Clipboard Image.png

Clipboard Image.png

实现步骤

1.建立抽取前后行数简单日志记录表(此处的日志记录主要记录是执行前后数据行数变化,错误日志记录此处就不介绍了,在事件处理程序中设计错误日志记录)

CREATE TABLE [dbo].[ETL_Program_Audit](
[ID] [int] IDENTITY(1,1) NOT NULL,
[ETLName] [nvarchar](100) NULL, --ETL包名
[ETLTable] [nvarchar](100) NULL, --ETL表名
[ETLDate] [datetime] NULL, --ETL日期
[StartTime] [datetime] NULL, --ETL开始时间
[EndTime] [datetime] NULL, --ETL结束时间
[ETLRowsStart] [bigint] NULL, --ETL前的行数
[ETLRowsEnd] [bigint] NULL, --ETL后的行数
[ETLStatus] [nvarchar](10) NULL --状态:开始结束
) ON [PRIMARY]

2.拖个Excute SQL Task 控件名为EST_Get_RowCount_Start,右击编辑,连接好数据源,SQLSourceType选择变量,新建三个String类型变量。

   A. v_TargetTable(存目标表名BatchRefresh_Bizuser):

Clipboard Image.png

   B. v_ETLRunTime(获取ETL执行时间):

(DT_WSTR, 20) (DT_DATE) getdate()

   C. v_SQL_ETLAudit_Start(存放ETL审计脚本):

Clipboard Image.png

"begin declare @etl_rows int; 
select @etl_rows=count(1) from dbo." + @[User::v_TargetTable] + ";
insert into dbo.ETL_Program_Audit(ETLName,ETLTable,ETLDate,ETLStatus,StartTime,ETLRowsStart)
values('" + @[System::PackageName] + "','" + @[User::v_TargetTable] + "','"+ (DT_WSTR, 20) @[User::v_ETLRunTime] +"','开始',getdate(),@etl_rows); end;"

   生成的sql如下所示:获取目标表执行前的行数等信息

begin declare @etl_rows int; 
select @etl_rows=count(1) from dbo.BatchRefresh_Bizuser;
insert into dbo.ETL_Program_Audit(ETLName,ETLTable,ETLDate,ETLStatus,StartTime,ETLRowsStart)
values('EJPBI_BatchRefresh_Bizuser_IL_old','BatchRefresh_Bizuser','2017/11/27 20:37:13','开始',getdate(),@etl_rows); end;

3.拖个Excute SQL Task 控件名为EST_Get_MaxETLTime,新建两个String 变量。

   A. v_MaxETLTime:存放获取的最大ETL时间,用来判断是否初次执行以及抽取时间范围的界定。

   B. v_cmd_MaxETLTime(Sql脚本):获取的最大ETL时间

"select max(endtime) from [ETL_Program_Audit] where etltable='"+ @[User::v_TargetTable] +"' "
 select max(endtime) from [ETL_Program_Audit] where etltable='BatchRefresh_Bizuser' 

4.拖个Data Flow Task名为DFT_Load_BatchRefresh_Bizuser,新建三个String变量

v_DayBeginTime(增量抽取的开始时间):

(DT_WSTR, 10)YEAR((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)MONTH((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)DAY((DT_DATE) @[User::v_MaxETLTime])+ " "+(DT_WSTR, 10) DATEPART("hh",  DATEADD("hh", -2,  (DT_DATE) @[User::v_MaxETLTime]))+":50"

v_DayEndTime (增量抽取的结束时间):

(DT_WSTR, 10)YEAR((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)MONTH((DT_DATE) @[User::v_MaxETLTime])+"-"+(DT_WSTR, 10)DAY((DT_DATE) @[User::v_MaxETLTime])+ " "+(DT_WSTR, 10) DATEPART("hh",   (DT_DATE) @[User::v_MaxETLTime])+":00"

v_SQL_Cmd_Bizuser表达式:

"SELECT bu.id,bu.name,bu.state,bu.audittime,bu.city_id,bu.lastupdatetime,bu.salesuser_id
FROM bizuser bu
where bu.audittime between '"+@[User::v_DayBeginTime]+"' and '"+ @[User::v_DayEndTime]+"'"
SELECT bu.id,bu.name,bu.state,bu.audittime,bu.city_id,bu.lastupdatetime,bu.salesuser_id
FROM bizuser bu
where bu.audittime between '2017-11-27 18:50' and '2017-11-27 20:00'

这里为什么是18:50而不是19:00,考虑中途抽取的时间可能过长,在每小时抽取的时候会漏抽数据。所以多抽十分钟的数据容错。根据实际情况考虑时间。

4.1 右击DFT_Load_BatchRefresh_Bizuser,属性如图所示,配置Expressions

   Clipboard Image.png

   Property绑定SqlCommand。 Expression赋值变量v_SQL_Cmd_Bizuser

   Clipboard Image.png

   设置完成DataFlowTask控件就会出现表达式图标:Clipboard Image.png

   作用:在数据流中的ODBC数据源中就会自动生成sql脚本,如图所示:

   Clipboard Image.png

4.2 优先约束表达式的配置,如图所示:

     4.2.1 最大执行时间不为空时,增量抽取。

         Clipboard Image.png

右击流程线编辑,进入优先约束编辑,配置如下图所示:

Clipboard Image.png

表达式:FINDSTRING((DT_WSTR, 20) @[User::v_MaxETLTime], "1899", 1) ==0?1==1:1==2

(说明:第一次执行,变量User::v_MaxETLTime值不存在,但是SSIS中默认是1899-01-01,没想到别的方法,所以此处我这样处理了下

           当User::v_MaxETLTime不为空时,时间肯定也是大于1899的,没找到匹配的1899,所以Findstring函数返回0

,三目运算返回1==1,为true,继续往下执行。

 Data Flow中,利用Lookup处理抽重的数据,无匹配则插入,匹配则更新,如图所示:

  关于Lookup和OLEDB Command控件这里也不做详细说明了。

Clipboard Image.png

     4.2.2 最大执行时间为空时,全量抽取。

Clipboard Image.png

          优先约束设置同最大执行时间不为空的设置和表达式一样,也就不做说明了。

新建String变量v_cmd_bizuser_all

"SELECT bu.id,bu.name,bu.state,bu.audittime,bu.city_id,bu.lastupdatetime,bu.salesuser_id
FROM bizuser bu"

DataFlow中,ODBC源和OLEDB目标,如图所示

Clipboard Image.png

5.拖ExcuteSqlTask名为EST_Get_RowCount_Finish。新建String变量

v_SQL_ETLAudit_End(记录执行完后表的行数):

"begin declare @etl_rows int; declare @MaxAuditID int; 
select @etl_rows=count(1) from dbo." + @[User::v_TargetTable] + "; select @MaxAuditID = Max(ID) from dbo.ETL_Program_Audit Where ETLName = '" + @[System::PackageName] +"' and ETLTable= '" + @[User::v_TargetTable] +"';
Update dbo.ETL_Program_Audit SET ETLRowsEnd = @etl_rows, EndTime = getdate(), ETLStatus = '结束'
Where ETLName = '"+ @[System::PackageName] +"' and ETLTable= '"+ @[User::v_TargetTable]+ "' and ID = @MaxAuditID; end;"
begin declare @etl_rows int; declare @MaxAuditID int;
select @etl_rows=count(1) from dbo.BatchRefresh_Bizuser;
select @MaxAuditID = Max(ID) from dbo.ETL_Program_Audit Where ETLName = 'EJPBI_BatchRefresh_Bizuser_IL_old' and ETLTable= 'BatchRefresh_Bizuser';
Update dbo.ETL_Program_Audit SET ETLRowsEnd = @etl_rows, EndTime = getdate(), ETLStatus = '结束'
Where ETLName = 'EJPBI_BatchRefresh_Bizuser_IL_old' and ETLTable= 'BatchRefresh_Bizuser' and ID = @MaxAuditID; end;

  6.优先约束设置,如图所示

Clipboard Image.png

其中有一条为true就可通过,继续执行,此处逻辑不可能存在都为true。

Clipboard Image.png

7.之前执行过,ETL_Program_Audit表MAXETLTime有值,执行结果,如图所示:

Clipboard Image.png

执行后日志表

Clipboard Image.png


总结:

案例用了太多的变量,到最后可能自己都晕了,根据实际情况有些不常变化的变量可以省去或者用别的方法替代变量。

增量抽取要考虑实际情况:包运行时的时间如果没有考虑到设计中去,增量抽取的时间戳范围没有界定好,可能会漏抽数据。

案例中简单的步骤省去了,主要抛出一种设计思想,供参考。

推荐 0
本文由 粽子job 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册