维度模型数据仓库(三)-定期装载

浏览: 1576

(三)定期装载

  数据仓库装载方式分为三种

  •  初始化装载
  • 定期装载
  • 按需装载

  在上面的章节中,我们已经讲到了如何初始化装载,并且按照SQL以及Kettle两种方式已经实现完成,下面章节我将说明定期装载。在我们项目当中,很多时候用到的都是此章节所说的定期装载。从源数据导入到目标数据仓库有两种方式,一种是通过数据仓库将数据从源数据拉数据(pull),也可以通过数据源推数据(push)。影响数据抽取方式的一个重要原因是源数据的可用性以及数据量。在数据抽取前,我们需要考虑两个问题:

  • 需要抽取哪部分源数据加载到数据仓库?有两种方式,完全抽取和变化数据捕获(CDC)。
  • 数据抽取的方向是什么?有两种方式,拉模式(用数据仓库去拉)和推模式(通过源去推)。

  完全抽取方式适用于数据量少的情况,在数据量大时,数据完成抽取将耗费大量的时间以及资源。这种方式适合引用类型的源数据,比如邮政编码。引用型源数据通常是维度表的源。如果源数据量很大,抽取全部数据是不可行的,那么只能抽取变化的源数据(自最后一次抽取以来变化的数据)。这种数据抽取模式称为变化数据捕获(CDC),通常被用于抽取操作型事务数据,比如销售订单.

    CDC大体可以分为两种,一种是侵入式的,另一种是非侵入式的。所谓的侵入式是指对源系统带来性能影响的数据抽取方式,只要CDC操作采用任何一种sql执行均可认为是侵入式的。常用的CDC四种抽取方式中有三种为侵入式的:基于时间戳CDC,基于快照CDC,基于触发的CDC以及基于日志的CDC。

  表三-1总结了这四种CDC方案的特点:

  

 时间戳方式快照方式触发器方式日志方式
能区分插入/更新否是是是
周期内,检测到多次更新否否是是
能检测到删除否是是是
不具有侵入性否否否是
支持实时否否是是
需要DBA否否是是
不依赖数据库是是否否

 

表(三)- 1

 

关于数据抽取方向问题思考,如果仓库系统对于数据及时性要求很高,要求源数据一准备好数据,立即将数据加载到数据仓库当中的情况,以及源数据受保护的情况,一般来说应该让数据源push数据至仓库。其它情况可以采用数据仓库拉数据。

 

  然后就是识别数据源以及决定采取什么抽取方式,在本实例中将采用如下图的抽取方式:

源数据数据仓库表抽取模式维度历史装载类型
customercustomer_dim整体、拉取address列上SCD2name列上SCD1
productproduct_dim整体、拉取SCD2
sales_orderorder_dimCDC(每天)、拉取唯一订单号
sales_order_factCDC(每天)、拉取每日销售订单
n/adate_dimn/a预装载 

 

 本实例中,order_dim以及sales_order_fact两个表将使用时间戳CDC方式,为方便起见,此实例不采用两次分别从order_dim以及sales_order_fact读取最大值作为上一次的抽取时间,采用新建一个时间表来记录。为此建立一个名为cdc_time的时间戳表,这个表里有两个字段,一个是last_load,一个是current_load。之所以需要两个字段,是因为在装载过程中,可能会有新的数据被插入或更新,为了避免脏读和死锁的情况,最好给时间戳设定一个上限条件,即current_load字段。本示例的时间粒度为每天,所以时间戳只要保留日期部分即可。这两个字段的初始值是“初始加载”执行的日期,本示例中为'2015-03-01'。当开始装载时,current_load设置为当前日期。开始实验前,先将cdc_time表建好,其脚本(脚本三-1)

 

 1 use dw;
2 if object_id(N'cdc_time','U') is not null drop table cdc_time ;
3
4 CREATE TABLE cdc_time
5 (
6 last_load datetime,
7 current_load datetime
8 );
9 INSERT INTO cdc_time VALUES ('2016-03-01', '2016-03-01') ;
10 go


(脚本三-1)

   使用(脚本三-2)清空客户以及产品表:


 1 --清空stg表以及需整体拉取的维度表(Customer_dim and product_dim表)
2 use dw;
3 truncate table dbo.customer_stg;
4 truncate table dbo.Product_stg;
5 /*主外键约束,先需要去除约束*/
6 select name , is_disabled from sys.foreign_keys order by name;
7 alter table dbo.Sales_order_fact NOCHECK constraint all;
8 DELETE FROM dbo.Customer_dim;
9 DELETE FROM dbo.Product_dim;
10 alter table dbo.Sales_order_fact CHECK constraint all;


 实现定期加载,使用脚本三-3:

create  procedure test as 
--定期加载数据

declare @eff_date datetime;
select @eff_date=current_load from dbo.cdc_time;

begin
UPDATE cdc_time SET current_load = getdate() ;
--客户以及产品表整体拉取(客户表address实现scd2,name实现scd1
insert into dbo.customer_stg
(customer_number
,customer_name
,customer_street_address
,customer_zip_code
,customer_city
,customer_state)
select Customer_number
,Customer_name
,Customer_street_address
,Customer_zip_code
,Customer_city
,Customer_state
from source.dbo.Customer;


--新增数据
insert into dbo.Customer_dim
(Customer_number
,Customer_name
,Customer_street_address
,Customer_zip_code
,Customer_city
,Customer_state
,effective_date
,expiry_date
,version)
select Customer_number
,Customer_name
,Customer_street_address
,Customer_zip_code
,Customer_city
,Customer_state
,
@eff_date
,
convert(datetime,'2099-12-31',120)
,
1 from dbo.customer_stg where customer_number not in (
SELECT y.customer_number
FROM customer_dim x, customer_stg y
WHERE x.customer_number = y.customer_number);
--更新数据处理
update customer_dim
set expiry_date=@eff_date
where exists (select 1 from customer_dim a,customer_stg b where a.customer_number=b.customer_number
and a.Customer_street_address<>b.Customer_street_address
AND expiry_date = convert(datetime,'2099-12-31',120)) ;

insert into customer_dim
(Customer_number
,Customer_name
,Customer_street_address
,Customer_zip_code
,Customer_city
,Customer_state
,effective_date
,expiry_date
,version)
select b.Customer_number
,b.Customer_name
,b.Customer_street_address
,b.Customer_zip_code
,b.Customer_city
,b.Customer_state
,
@eff_date
,
convert(datetime,'2099-12-31',120)
,a.version
+1
from customer_dim a ,customer_stg b
where a.customer_number=b.customer_number
and a.Customer_street_address<>b.Customer_street_address
AND EXISTS(
SELECT *
FROM customer_dim x
WHERE
b.customer_number
=x.customer_number
AND a.expiry_date = @eff_date)
AND NOT EXISTS (
SELECT *
FROM customer_dim y
WHERE
b.customer_number
= y.customer_number
AND y.expiry_date = convert(datetime,'2099-12-31',120)) ;


insert into dbo.product_stg
(Product_code
,product_name
,product_category)
select Product_code,Product_name,Product_category from source.dbo.Product;

--设置scd的截止时间以及生效时间

--新产品
insert into dbo.Product_dim
(Product_code
,product_name
,product_category
,effective_date
,expire_date
,version)
select Product_code,Product_name,Product_category
,
@eff_date
,
convert(datetime,'2099-12-31',120)
,
1 from dbo.product_stg where Product_code not in (select
Product_code
from dbo.Product_dim);

--产品表更新
update dbo.Product_dim
set expire_date=@eff_date
where exists(
select 1 from dbo.Product_dim a,dbo.product_stg b where
a.Product_code
=b.Product_code
and (a.product_name<>b.product_name
or a.product_category<>b.product_category)
and a.expire_date=convert(datetime,'2099-12-31',120));

insert into dbo.Product_dim
(Product_code
,product_name
,product_category
,effective_date
,expire_date
,version)
select b.product_code
, b.product_name
, b.product_category
,
@eff_date
,
'2099-12-31'
, a.version
+ 1
FROM
product_dim a
, product_stg b
WHERE
a.product_code
= b.product_code
AND ( a.product_name <> b.product_name
OR a.product_category <> b.product_category)
AND EXISTS(
SELECT *
FROM product_dim x
WHERE b.product_code = x.product_code
AND a.expire_date = @eff_date)
AND NOT EXISTS (
SELECT *
FROM product_dim y
WHERE b.product_code = y.product_code
AND y.expire_date = '2099-12-31') ;
--新数据


--新增的订单
--
装载订单维度,新增前一天的订单号
INSERT INTO order_dim (
order_number
, effective_date
, expiry_date)
SELECT
order_number
, order_date
,
'2200-01-01'
FROM source.sales_order, cdc_time
WHERE entry_date >= last_load AND entry_date < current_load ;

-- 装载事实表,新增前一天的订单
INSERT INTO sales_order_fact
SELECT
order_sk
, customer_sk
, product_sk
, date_sk
, order_amount
FROM
source.sales_order a
, order_dim b
, customer_dim c
, product_dim d
, date_dim e
, cdc_time f
WHERE
a.order_number
= b.order_number
AND a.customer_number = c.customer_number
AND a.order_date >= c.effective_date
AND a.order_date < c.expiry_date
AND a.product_code = d.product_code
AND a.order_date >= d.effective_date
AND a.order_date < d.expiry_date
AND a.order_date = e.date
AND a.entry_date >= f.last_load AND a.entry_date < f.current_load ;

-- 更新时间戳表的last_load字段
UPDATE cdc_time SET last_load = current_load ;

end;

复制代码

至此sql已实现完成,下面将介绍kettle实现;kettle实现之前删除刚刚sql实现跑进去的数据。脚本如下:

  

复制代码

 use dw;
truncate table dbo.customer_stg;
truncate table dbo.Product_stg;

alter table dbo.Sales_order_fact NOCHECK constraint all;
DELETE FROM dbo.Customer_dim;
DELETE FROM dbo.Product_dim;
delete from dbo.order_dim;
truncate table dbo.Sales_order_fact;
alter table dbo.Sales_order_fact CHECK constraint all;


 

 

 

          

 

 

 

 

推荐 0
本文由 doudou1 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册