微软BI 之SSIS 系列 - Merge 合并操作与 Sort 排序

浏览: 3774

开篇介绍

在数据仓库项目中,我们的数据源可能来源不一,比如关于客户的信息可能就有好几个数据源,旧系统导出的文本文件,新系统的数据库表的信息等等。在正式统一处理这些信息之前,有的时候是需要做一些合并操作的,需要先将两部分的数据合并起来统一的存储到一个临时表中,然后再来做去重,筛选等处理。在 SQL Server 中我们可以非常容易的通过表的查询通过 UNION、UNION ALL 等关键字合并数据,在 SSIS 中我们也有这样的几种控件也可以起到这样的作用,比如说 Merge,Merge Join,Union All。

 

本篇讲解的是 Merge 控件的使用,并且在使用的过程中也会介绍到 Sort 排序控件的操作。

测试的数据源

-- Merge demo table
IF OBJECT_ID(\'T036_CUSTOMER\',\'U\') IS NOT NULL
DROP TABLE T036_CUSTOMER
GO

CREATE TABLE T036_CUSTOMER
(
CustomerID
INT PRIMARY KEY,
CustomerCompany
NVARCHAR(255),
CustomerName
NVARCHAR(20),
CustomerAddress
NVARCHAR(255)
)

INSERT INTO T036_CUSTOMER VALUES
(
1,\'HFBZG\',\'Allen,Michael\',\'Obere Str. 0123\'),
(
2,\'MLTDN\',\'Hassall, Mark\',\'Avda. de la Constitución 5678\'),
(
3,\'KBUDE\',\'Peoples, John\',\'Mataderos 1000\')

SELECT * FROM T036_CUSTOMER

测试数据

第二个数据源是文本文件,注意到第 5 行是重复的数据。

ID,Company,CustomerName,Title,Address

1,\'NRZBB\',\'Allen,Michael\',\'Sales Representative\',\'Obere Str. 0123\'

2,\'MLTDN\',\'Hassall, Mark\',\'Owner\',\'Avda. de la Constitución 5678\'

3,\'KBUDE\',\'Peoples, John\',\'Owner\',\'Mataderos  7890\'

4,\'HFBZG\',\'Arndt, Torsten\',\'Sales Representative\',\'7890 Hanover Sq.\'

5,\'HGVLZ\',\'Higginbotham, Tom\',\'Order Administrator\',\'Berguvsvägen  5678\'

5,\'HGVLZ\',\'Higginbotham, Tom\',\'Order Administrator\',\'Berguvsvägen  5678\'

新建一个包和数据流,并添加一个 OLE DB Source 把表 T036_CUSTOMER 作为源表配置起来。

添加一个平面文件源,并配置其平面文件连接管理器,文本字符有‘包括。

拖放 Merge 控件,并关联上 OLE DB Source 与 Merge 控件。

关联上文件源与 Merge 控件之后编辑 Merge,会出现这种错误:

The input "Merge Input 1" and the input "Merge Input 2" have sort-key positions assigned to their columns incorrectly.

这是 Merge 对于合并两端数据的第一个要求,即合并两端的数据必须要求排序。那么还有一个问题,排序列我们选择什么?在这种情况下,排序列可以根据实际情况来选择,比如有主键特征的列,这里可以选择 ID 列。

对于 OLE_SRC_CUSTOMER 来说,这种排序是直接放在查询中来完成的,并且也应该在这里来完成,修改 OLE DB Source 改成排序的查询语句。

但是对于文件源来说,由于不能像 SQL 那样直接查询排序,因此需要借助排序控件 Sort Task 来完成。

编辑 Sort 控件,选择需要排序的列和需要传递往下输出的列,默认情况下全部选中。最下面的一个选项也非常重要,在合并数据的时候是否需要删除重复的行,先不选择。

确定并编辑 Merge,发现一个新的错误信息:

The input "Merge Input 1" does not have sort-key positions assigned correctly to its columns.

这是因为尽管在 OLE_SRC_CUSTOMER 中数据已经排序了,但是对于 Merge 来说它在读取来自于它上游的 OLE_SRC_CUSTOMER 中的数据的时候,OLE_SRC_CUSTOMER 并没有明显的通知 Merge 它到底是如何来排序的,因此需要右键 OLE_SRC_CUSTOMER 来修改其高级选项 Advanced Editor。

在 Input and Output Properties 中,选择 OLE DB Source Output,并修改 IsSorted 属性为 True 来表明这个控件的 Output 输出流 是排过序的。

同时还需要指定如何排序,按照哪些列来排的序。因为在示例 Merge 中我们选择的是 CustomerID, 因此这里将它的 0 修改为1。 0 表示是不排序的,1 表示是第 一个排序位,这里应该按照 ORDER BY 后面的列顺序来设置,第二个排序列就设置为 2, 依此类推。

修改完毕之后再次编辑 Merge 会看到这样的一个错误:

The metadata for "Merge.Inputs[Merge Input 2].Columns[ID]" does not match the metadata for the asscociated output column。

这种错误发生的原因就是因为 Merge 两端的数据类型不一致,因此我们需要修改文件源中的数据类型,使得 Merge 两端的数据类型保持一致。

修改文件源平面文件连接管理器中的列属性,将列的数据类型按照左表类型设置,保持左右两边数据类型一致,比如INT, NVARCHAR 类型等。

配置完成之后,保存并执行包,可以看到合并之后的测试数据有 9 行,即使有重复的也被保留往下输出了。

再次修改 Sort 控件,来看看这个排序组件对重复数据处理的问题,删除重复项。

保存并执行可以看到来自文件源中的重复数据在经过 Sort 排序的时候被合并删除掉了。

那么这里的问题是,这里的 Sort 排序删除重复数据是只基于排序列呢?还是比较整个数据行,为弄清楚这个区别,我们将文件源的最后两列做一些修改。

 ID 1,2 的 ID 不同,即排序列不同,但是后面的内容都相同。ID 为5的 ID 相同,但是后面内容不相同。

执行包并查看测试数据,发现在文件源中经过 SORT 排序之后只有重复的 ID (排序列)就已经删除掉了。

整个合并之后的效果。

总结

从上面三个错误步骤的处理,我们可以总结 Merge 控件的操作特点:

  • 只能有两个输入数据源。
  • 两个输入数据源可以是文件,也可以是表,视图或者查询返回的数据集。
  • 合并时要求左右两端的输入项必须排序,无论是显式通过 Sort 组件排序还是在 OLE DB Source 高级设置中配置,都需要指定排序列。
  • 合并前要求左右两端的对应列的数据类型必须一致,在长度上可以以左侧为主,但是类型要完全一致。
  • SSIS 中的 Merge 控件,在效果上上等同于 SQL Server 中的 UNION ALL 合并但保留重复数据。
  • 输出结果也是排序的。

对于 Sort 组件,简单的了解了一下它的去重操作,这里提一下这个组件是一个 Asynchronous Transformation 异步转换和 Blocked Transformation 阻塞转换组件。 与我们之前提过的 Aggregate 组件属于同一类组件。

它们处理数据的过程是先从上游数据源中抽取所有数据,再开始处理排序,全部排序完成之后再产生输出。这样的过程极大的消耗了内存并且使得整个处理的过程变得缓慢。

所以在 Merge 操作合并数据的时候,如果数据源是基于数据表查询的,这种排序我们应该在查询阶段就完成。除非像我们这个例子中,由于文本文件数据源没有办法通过 SQL 排序查询来完成,因此才会使用到 Sort 控件来解决排序的问题。

推荐 0
本文由 BIWORK 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

1 个评论

我想用Aggregate 组件属于同一类组件算一下我抽取到的数据里的更新时间最大的更新时间是哪个?我用多播来把抽取上来的数据一个往事实表目标走,一个输出一个MAX(更新时间)这样子的话,这种阻塞组件性能上不受影响的吧,因为输出只是一个值,然后我这个值存入到变量里,供我的日志表使用之类的。

要回复文章请先登录注册