开篇介绍
在 ETL 项目中,很多时候情况下是需要通过编程的手段来解决一些问题的。比如说在控制流中对参数的验证,文件路径的验证,文件的移动拷贝归档等等,包括在数据流中需要通过编程处理一些比如特殊不规则文件的读取,复杂的分支流程处理,包括对数据库的访问等等。在控制流中我们可以使用到 Script Task,同样的在数据流中我们也有 Script Component 来实现自定义的脚本编程 Script。自 2008 开始,SSIS Script 组件不仅仅支持 VB 语法,也同样开始支持 C# 语法,即我们可以在 Script Component 中来使用 C#.NET。
Script Component 初步体验
Script Component 是一个比较特殊的控件,它即可以在数据流中担当 Source 的角色,也可以担当 Destination,更多的时候它只起到 Transformation 的作用。我们下面通过一个小例子,先简单认识一下 Script Component,之后再单独来介绍各个具体的功能。
这里是 5 行测试数据,这是数据流中 OLE DB Source 中关联到的数据(请自行添加数据流任务)。
自行创建包以及数据流,OLE DB Source 数据源,并拖放一个 Script Component 进来,默认选择 Transformation。
在 Script Component 和 Audit 之间添加一个 Data Viewer。
在 Input Columns 中只选择 ProductName,这里一旦选择,那么 ProductName 在数据流中的 Script Component 是直接可以访问到的。在下面可以选择 ReadOnly 只读或者 ReadWrite,即在 Script Component 中是 ProductName 是只读的还是可以读写赋值的。这里选择只读状态。
回到 Script 编辑脚本,第一次打开的时候会稍微慢一些,之后再使用打开会比较快一些。
与控制流的 Script Task 不同,在数据流中 Script Component 如果想访问 Windows Form 是需要自己动手引入这个命名空间的。
下面的代码中要着重看三个方法:
- PreExecute() - Script Component 所在当前数据流执行之前被调用的方法,完成一些初始化操作。包括可以在这里面完成一些数据库连接等等(.NET 编程)
- PostExecute() - Script Component 所在当前数据流执行之后被调用的方法,完成一些释放资源的操作。在这个例子中,比较简单不做复杂的处理。
- Input0_ProcessInputRow() - 上游数据源每输入一行方法执行一次,并且在这个方法中是可以读取到行的每一列的,只需要在上面的配置中像 ProductName 被选中即可。本案例中,我们的源数据有 5 行,因此此方法会被调用 5 次。
/// <summary>
/// This method is called once, before rows begin to be processed in the data flow.
///
/// You can remove this method if you don\'t need to do anything here.
/// </summary>
public override void PreExecute()
{
base.PreExecute();
/*
* Add your code here
*/
MessageBox.Show("Call PreExecute()");
}
/// <summary>
/// This method is called after all the rows have passed through this component.
///
/// You can delete this method if you don\'t need to do anything here.
/// </summary>
public override void PostExecute()
{
base.PostExecute();
/*
* Add your code here
*/
MessageBox.Show("Call Post()");
}
/// <summary>
/// This method is called once for every row that passes through the component from Input0.
///
/// Example of reading a value from a column in the the row:
/// string zipCode = Row.ZipCode
///
/// Example of writing a value to a column in the row:
/// Row.ZipCode = zipCode
/// </summary>
/// <param name="Row">The row that is currently passing through the component</param>
public override void Input0_ProcessInputRow(Input0Buffer Row)
{
/*
* Add your code here
*/
MessageBox.Show(Row.ProductName);
}
执行包,数据流已经开始执行。
点击进去之后,发现 OLE_SRC_PRODUCT_SALES 并没有被执行,但是可以看到弹出对话框中来自于 PreExecute() 的代码被执行了。
等 PreExecute() 方法被执行完毕之后,OLE_SRC_PRODUCT_SALES 才开始执行。在执行 SC_COUNT_ROWS 这个 Script Component 过程中,调用了 Input0_ProcessInputRow() 这个方法,并且通过 Row.ProductName 直接访问到了 ProductName 这一列。
多说一句,Input0_ProcessInputRow(Input0Buffer Row) 参数 Row 的生命周期就是这个方法调用的时候创建,调用结束的时候结束。Row 是每一个进入 Script Component 组件行的实例。
待 SC_COUNT_ROWS 所有行进入处理并往下一起输出的时候,可以在 Data Viewer 中清晰的看到这 5 行数据,并且可以看到 Audit 控件也处于执行状态。
但是要注意到,PostExecute() 方法的调用确实整个数据流组件执行结束之后才被调用的,并且这个方法也只执行了一次!
因此,可以通过这个简单的例子了解到在默认 Transformation 形式下:
- 三个默认方法的执行先后顺序,何时开始执行,执行次数,以及各自的区别。
- 如何访问数据源中的列,Row 对象的生命周期。
- Input0_ProcessInputRow() 方法的执行与输入行的关系。
了解到上面几个特点之后,我们可以很容易的实现一个数据流输入条数的计数功能。(虽然我们有 Row Count,但是我们也可以通过 Script Component 实现,并且可以大胆的猜测 Row Count 的实现方式和这里的 Script Component 是一样的,只不过不需要初始化那么多东西)。
使用 Script Component 完成一个行计数器
创建一个 int 类型的变量,并配置在 Script Component 可读写的列表中。
主要代码如下:
PreExecute 方法主要完成计数变量的初始化。
ProcessInputRow 方法每进入一行,执行一次计数。
PostExecute 方法等整个数据流执行完毕的时候调用显示一下计数结果。
int count = 0;
/// <summary>
/// This method is called once, before rows begin to be processed in the data flow.
///
/// You can remove this method if you don\'t need to do anything here.
/// </summary>
public override void PreExecute()
{
base.PreExecute();
/*
* Add your code here
*/
count = 0;
}
/// <summary>
/// This method is called after all the rows have passed through this component.
///
/// You can delete this method if you don\'t need to do anything here.
/// </summary>
public override void PostExecute()
{
base.PostExecute();
/*
* Add your code here
*/
Variables.PVROWCOUNT = count;
MessageBox.Show("Total Count - "+Variables.PVROWCOUNT);
}
/// <summary>
/// This method is called once for every row that passes through the component from Input0.
///
/// Example of reading a value from a column in the the row:
/// string zipCode = Row.ZipCode
///
/// Example of writing a value to a column in the row:
/// Row.ZipCode = zipCode
/// </summary>
/// <param name="Row">The row that is currently passing through the component</param>
public override void Input0_ProcessInputRow(Input0Buffer Row)
{
/*
* Add your code here
*/
count++;
}
结果显示如下: