看了一段时间NiFi的文档,也测试了一些的内容,现给大家介绍下吧。
我目前使用了Hortonworks的HDF,只是为了部署方便, 这个示例使用单独安装的NiFi版本也一样的。
先拿小票的示例来演示一下基本操作,具体实现小票数据从Kakfa 同步到数据库(PostgreSQL)。
小票格式是个json串,包括表头信息和商品的明细信息,类似如下:
{ "trx_num" : "trx0001", "quantity" : 3,"value" : 6, "trx_dtl" : [{ "item" : "item01", "item_qty" : 1, "price" : 1},{ "item" : "item02", "item_qty" : 2, "price" : 2}]}
计划分3部分介绍
1.先实现表头的测试数据
2.拆分明细数据并insert到PostgreSQL的部分。
3.获取Kakfa数据
第一部分概览:
使用3个组件 :
1.GenerateFlowFile生成测试数据;
2.EvaluateJsonPath获取json属性;
3.UpdateAttribute测试用看结果数据;
各自的属性设置
GenerateFlowFile设置时注意下filesize, 还有schedule时间设置长一点,不然没法测试。
EvaluateJsonPath: 由于只是获取属性,注意设置destination=flowfileattribute, 属性使用$.属性名表示
连接process的注意事项:
Automatically terminate relationships 下要把没有连接的选上,不然会报错,无法运行
由于是演示EvaluateJsonPath的failure和unmatched就没作处理,直接结束掉
运行和检测结果
按一下空白处,然后点击run皆可以运行了^-^
NiFi可以跟踪到每一条,检测非常方便
可以看到属性已经被取出来了。