NiFi 入门:Kafka 数据同步到关系数据库(PostgreSQL)--Part3

浏览: 3347

Part1Part2把如何使用EvaluateJsonPath获取属性值,SplitJson 拆分行数据说清楚了,还差Kafka源的部分.

Kafka源的设置:

NiFi中对应Kafka有不同版本, 分队对应0.9,0.10,0.11和1.0的不同Kafka版本,低版本的process可以处理高版本的Kafka


HDP用的Kafka0.10的版本,我使用了ConsumeKafka_0_10,注意groupid不用和其他用的process重复,不然会被消费掉.


进一步思考:

1. 判断记录是否符合格式

增加了属性判断:使用RouteOnAttribute判断trx_num属性是否非空

${trx_num:isEmpty():not()}

防止完成后的概览图:

2.考虑使用record模式

attribute模式给了用户很多自由度,但是也导致了,属性要多次配置,record模式比较好的解决了这个模式,通过统一定义schema,

从Kafka端就可以很好的解构

3. flowfile的合并

现在putsql是一行一行的作insert, 记录数不多还好,多的话对性能影响较大.并且如果是往其他目标insert(如hdfs\mogndo)等, 需要配合mergercontent组件完成.

推荐 1
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册