【老贝伏枥】-2.kafka单机实践-实时WordCount

浏览: 2166

2.3、实现一个实时词频统计

  Kafka Streams是用于构建关键任务实时应用程序和微服务的客户端库,输入或输出数据存储在Kafka集群中。它结合了在客户端编写和部署标准Java和Scala应用程序的简单性,以及Kafka的服务器端集群技术的优点,使这些应用程序具有高度可伸缩性、弹性、容错、分布式和更多的优点。官方给的示例程序在这个库中运行流应用程序,源代码在GitHub:https://github.com/apache/kafka/blob/1.1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java,安装文件/opt/kafka/libs/kafka-streams-1.1.0.jar自带了这个类:org.apache.kafka.streams.examples.wordcount.WordCountDemo,意图是将Producer获取到的文本信息,实时统计并输出。

1)创建一个Topic:streams-plaintext-input,官方所给的demo的数据来源指定的Topic就是它,当然可以用Intellij改demo里的代码,然后打包再运行。

bin/kafka-topics.sh --create \

    --zookeeper localhost:2181 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-plaintext-input

2)创建一个Topic:streams-wordcount-output,用于输出词频计算的结果

 bin/kafka-topics.sh --create \

    --zookeeper localhost:2181 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-wordcount-output \

    --config cleanup.policy=compact 

#这里对输出启用了压缩,单机版启用与否都不影响

执行命令bin/kafka-topics.sh --zookeeper localhost:2181 --list 即可查询到Kafka集群已有的Topic信息。

3)启动WordCount程序

程序将从streams-plaintext-input 输入中读取,对每个读取消息执行WordCount算法的计算,并将其当前结果持续地写入streams-wordcount-output。

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

执行后也会出现一堆Log信息,别管他,然后通过一个单独的控制台终端中读取输出信息,检查WordCount应用的输出结果。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \

    --topic streams-wordcount-output \

    --from-beginning \

    --formatter kafka.tools.DefaultMessageFormatter \

    --property print.key=true \

    --property print.value=true \

    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \

    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

图片.png

4)启动Producer手工输入文本信息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

图片.png

手工输入了一行文本,接下来马上看Output的统计结果

图片.png

手工多输入一些文本,程序基本上妙极计算结果并输出,以下是我测试的结果:

图片.png

可以看到,Wordcount应用程序的输出实际上是一个连续的更新流,其中每个输出记录都是一个单词的更新计数。对于相同的多个记录,每个后续记录都是前一个记录的更新。

---------------------------------------------------------------------------------------------------------

基于这个demo,我把文字来源改为某个程序的log文件,只要log更新,wordcount马上计算结果。

A、添加一个connect-steam-source.properties文件

name=local-file-source

connector.class=FileStreamSource

tasks.max=1

file=/opt/kafka/logs/controller.log  #这里设置为监控 kafka的运行日志

topic=streams-plaintext-input #跟上面的配置i一样,把log的信息输出到 streams-plaintext-input。

这样WordCount程序就可以从streams-plaintext-input获取文字信息,并实时统计

运行命令:bin/connect-standalone.sh config/connect-standalone.properties config/connect-steam-source.properties &

再查看下streams-plaintext-output的结果

图片.png

这样算买马马虎虎的搞定了。由于connect-standalone.properties配置的是文字格式为JSON,而WordCount程序又没有解析JSON格式,故出现了上图那种情况。

推荐 1
本文由 贝克汉姆 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册