NiFi 入门:Kafka 数据同步到关系数据库(PostgreSQL)--part1

浏览: 2625

看了一段时间NiFi的文档,也测试了一些的内容,现给大家介绍下吧。

我目前使用了Hortonworks的HDF,只是为了部署方便, 这个示例使用单独安装的NiFi版本也一样的。

先拿小票的示例来演示一下基本操作,具体实现小票数据从Kakfa 同步到数据库(PostgreSQL)。

小票格式是个json串,包括表头信息和商品的明细信息,类似如下:

{ "trx_num" : "trx0001", "quantity" : 3,"value" : 6, "trx_dtl" : [{ "item" : "item01", "item_qty" : 1, "price" : 1},{ "item" : "item02", "item_qty" : 2, "price" : 2}]}

计划分3部分介绍

1.先实现表头的测试数据

2.拆分明细数据并insert到PostgreSQL的部分。

3.获取Kakfa数据

第一部分概览:

使用3个组件 :

1.GenerateFlowFile生成测试数据;

2.EvaluateJsonPath获取json属性;

3.UpdateAttribute测试用看结果数据;


各自的属性设置

GenerateFlowFile设置时注意下filesize, 还有schedule时间设置长一点,不然没法测试。

EvaluateJsonPath:  由于只是获取属性,注意设置destination=flowfileattribute, 属性使用$.属性名表示



连接process的注意事项:

Automatically terminate relationships 下要把没有连接的选上,不然会报错,无法运行

由于是演示EvaluateJsonPath的failure和unmatched就没作处理,直接结束掉

运行和检测结果

按一下空白处,然后点击run皆可以运行了^-^


NiFi可以跟踪到每一条,检测非常方便


可以看到属性已经被取出来了。


推荐 1
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册