【老贝伏枥】-2.kafka单机实践-文件监控

浏览: 1691

2.2、Connector组件

  要实现文件监控功能,首先必须了解Connector组件的功能特点。Connector方便的从控制台写入数据并将其写入控制台,它可以导入来自其他来源的数据,或导出数据到其他外部系统。Kafka Connect是一个可扩展的工具,通过自定义的逻辑和外部系统交互,并将外部数据导入和到出到目的地。下面来看看Kafka Connector示例,如何从文件导入数据到Topic,并从Topic导出数据到目的文件。但是,一般实施项目时,都比较喜欢用Apache Flume来监控日志文件。既然connector存在了肯定有存在的价值。下图显示了Connector在kafka体系的位置:


Connector在kafka体系的位置

1)、打开安装目录下的config/connect-file-source.properties,这个文件定义了数据来源的参数信息。你也可以在命令后面带这些参数,如 --topic connect-test等。如果参数太多,建议用properties文件配置,这样比较省事。

name=local-file-source

connector.class=FileStreamSource #一个文件流的方法类

tasks.max=1 #最大任务数量,集群环境就按实际Node数配置

file=test.txt #监控这个文件,默认路径是/opt/kafka/test.txt,表示要实时获取这个文件里的内容

topic=connect-test  #topic 只要用到kafka,必须用它作为桥梁

2)、打开安装目录下的config/connect-file-sink.properties

name=local-file-sink

connector.class=FileStreamSink #Sink 水槽,字面意思很清晰了,回收数据的地方。hadoop/zookeeper/pig这些名字,都是本着简单实用易懂的原则。

tasks.max=1

file=test.sink.txt #获取到的实时文件,传输到此文件。完整路径是/opt/kafka/test.sink.txt。

#这是官方带的demo未免太Low了,下回我自己改一下,把它写到数据库里去。

topics=connect-test

3)、打开装目录下的config/connect-standalone.properties,意思是运行一个单机程序。以下是这个单机程序所需要的参数信息。

bootstrap.servers=localhost:9092 #broker

key.converter.schemas.enable=true

value.converter.schemas.enable=true

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

...
#中间这段意思是配置消息的格式为JSON,什么是JSON?自个问度娘去吧。

offset.storage.file.filename=/opt/kafka/tmp/connect.offsets #offset存储的文件路径,建议像这样改到安装目录下,记住必须创建/opt/kafka/tmp文件夹。

...

4)运行命令

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties &

执行后会显示一大堆Log信息。马上目录会出现2个配置文件指定的文本文件,如果没有,就手动创建它们。

test.txt  test.sink.txt

5)监控test.txt的文件内容,实际应用的时候,文件的内容是程序写入的,比如APP运行日志信息。这里我们手工写入,当然你也可以弄一个shell脚本来实现文字写入功能。

图片.png

往文件里追加一行信息,connector会实时监控破获最新的一行文字,并通过topic传输到 test.sink.txt文件里

6)实时接受和存储最新信息到test.sink.txt文件

图片.png

此时,源文件的内容只要一增加,会在1秒内传输到该文件存储。与此同时,还可以用Consumer显示该Topic的实时消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning &

图片.png

程序内部的信息实际上是把文字转换成JSON格式传输的,因为JSON格式的好处是方便代码处理,保证了文字信息的完整性。咋一看,官方自带的sample还是挺easy的,没费多大劲就实现了一个简洁易懂的实时监控程序。据LinkedIn的demo所述,三台普通的X86搭建的集群,可以实现100 000条/秒的传输效果,可见kafka集群的强大功能。

下一个例子是实现实时数据流程序wordcount。



----------------------------我也是有底线的-------------------------------

推荐 1
本文由 贝克汉姆 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册