之前的这篇blog NiFi 入门:Kakfa 数据同步到关系数据库(PostgreSQL) 只说了写到数据库. 写到HDFS或Mongo都可能有这个需要.
先说一下把数据存储到, HDFS,按日期的文件名存储, 数据源还是Kafka。
总的流程如下图:
使用PutHDFS process 大数据存储到HDFS
PutHDFS设置Hadoop Configure +Resouroes参数就可以使用了.
不过在Kafka这个流的环境下,有几个属性设置要注意下:
Conflict Resolution Strategy: 由于一天的数据放在一起,要设置append参数 可以追加到文件末尾. 在使用putfile的process时候一直希望有这个功能, 看了说明说在puthdfs里实现了,不实现了orz.
Replication: 如果连接HDP,而HDP默认Replication可能需要设置成2.
使用MergeConetent合并数据
在PutHDFS的上游,也就是PutHDFS的数据源的时候,要注意数据流的间隔. 不然append太频繁,一个速度慢, 上游的queue会留存太多数据, 写太频繁也会报错.
Minimum Number of Entries要设置的比较大,解决写的频率问题和性能问题.
Max bin age:解决万一没有数据,时间有个上限.
太频繁的错误类似如下,如果满意碰到一定要使用这个process:
IOException thrown from PutHDFS[id=01651013-4a64-174d-adf2-bb337873105e]:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to APPEND_FILE /nifidata/cmadata for DFSClient_NONMAPREDUCE_-1639314865_130 on 192.168.0.5 because this file lease is currently owned by DFSClient_NONMAPREDUCE_1660895298_138 on 192.168.0.5
使用UpdateAttribute 设置 filename
如何保证一天的数据在一个文件里, 使用UpdateAttribute 设置 filename的属性即可.
类似这样: filename${now():format('yyyymmdd')}${hostname(false)}.txt
按hostname分别存,这样可以确保多节点情况下不会冲突。