2.2、Connector组件
要实现文件监控功能,首先必须了解Connector组件的功能特点。Connector方便的从控制台写入数据并将其写入控制台,它可以导入来自其他来源的数据,或导出数据到其他外部系统。Kafka Connect是一个可扩展的工具,通过自定义的逻辑和外部系统交互,并将外部数据导入和到出到目的地。下面来看看Kafka Connector示例,如何从文件导入数据到Topic,并从Topic导出数据到目的文件。但是,一般实施项目时,都比较喜欢用Apache Flume来监控日志文件。既然connector存在了肯定有存在的价值。下图显示了Connector在kafka体系的位置:
1)、打开安装目录下的config/connect-file-source.properties,这个文件定义了数据来源的参数信息。你也可以在命令后面带这些参数,如 --topic connect-test等。如果参数太多,建议用properties文件配置,这样比较省事。
name=local-file-source
connector.class=FileStreamSource #一个文件流的方法类
tasks.max=1 #最大任务数量,集群环境就按实际Node数配置
file=test.txt #监控这个文件,默认路径是/opt/kafka/test.txt,表示要实时获取这个文件里的内容
topic=connect-test #topic 只要用到kafka,必须用它作为桥梁
2)、打开安装目录下的config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink #Sink 水槽,字面意思很清晰了,回收数据的地方。hadoop/zookeeper/pig这些名字,都是本着简单实用易懂的原则。
tasks.max=1
file=test.sink.txt #获取到的实时文件,传输到此文件。完整路径是/opt/kafka/test.sink.txt。
#这是官方带的demo未免太Low了,下回我自己改一下,把它写到数据库里去。
topics=connect-test
3)、打开装目录下的config/connect-standalone.properties,意思是运行一个单机程序。以下是这个单机程序所需要的参数信息。
bootstrap.servers=localhost:9092 #broker
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
...
#中间这段意思是配置消息的格式为JSON,什么是JSON?自个问度娘去吧。offset.storage.file.filename=/opt/kafka/tmp/connect.offsets #offset存储的文件路径,建议像这样改到安装目录下,记住必须创建/opt/kafka/tmp文件夹。
...
4)运行命令
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties &
执行后会显示一大堆Log信息。马上目录会出现2个配置文件指定的文本文件,如果没有,就手动创建它们。
test.txt test.sink.txt
5)监控test.txt的文件内容,实际应用的时候,文件的内容是程序写入的,比如APP运行日志信息。这里我们手工写入,当然你也可以弄一个shell脚本来实现文字写入功能。
往文件里追加一行信息,connector会实时监控破获最新的一行文字,并通过topic传输到 test.sink.txt文件里
6)实时接受和存储最新信息到test.sink.txt文件
此时,源文件的内容只要一增加,会在1秒内传输到该文件存储。与此同时,还可以用Consumer显示该Topic的实时消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning &
程序内部的信息实际上是把文字转换成JSON格式传输的,因为JSON格式的好处是方便代码处理,保证了文字信息的完整性。咋一看,官方自带的sample还是挺easy的,没费多大劲就实现了一个简洁易懂的实时监控程序。据LinkedIn的demo所述,三台普通的X86搭建的集群,可以实现100 000条/秒的传输效果,可见kafka集群的强大功能。
下一个例子是实现实时数据流程序wordcount。
----------------------------我也是有底线的-------------------------------