KAKFA
• KAKFA
◦ 分布式消息队列
▪ KAFKA概述
▪ KAFKA 的架构及核心概念
▪ KAFKA 的部署及使用
▪ KAFKA 容错性测试
▪ KAFAK API编程
▪ KAFKA 实战
分布式消息队列
KAFKA概述
官网: kafka 官方网站
和消息系统类似 消息中间件:生产者和消费者
妈妈:生产者 你:消费者 馒头:数据流
KAFKA 的架构及核心概念
Kafka 架构
• Producer:生产者,就是生产馒头(老妈)
• Consumer:消费者,就是吃馒头的(你)
• broker:篮子
• topic:主题
KAFKA 的部署及使用
zookeeper 配置
• 官网下载安装包
• 解压压缩包
• 配置~/.bash_profile 文件
• source ~/.base_profile 使配置文件生效
• 配置zoo.cfg 文件
• 将dataDir默认路径进行修改为自定义路径
• 启动zookeeper服务端:sh zkSever.sh start
• 启动zookeeper客户端:sh zkCli.sh
Kafka部署
• 官网下载安装包
• 解压压缩包
• 配置~/.bash_profile 文件
• source ~/.base_profile 使配置文件生效
服务架构
单节点单Broker部署及使用
• 配置server.properties
• broker.id=0
• listeners
• log.dir
• zookeeper.connect
• 启动kafka kafka-server-start.sh 执行上述命令会提示一下信息(提示正确的启动命令后需跟参数):USAGE: kafka-server-start.sh [-daemon] server.properties [–override property=value]* kafka-server-start.sh $KAKA_HOME/config/server.properties
• 创建topic kafka-topics.sh –create –zookeeper bigdata:2181 –replication-factor 1 –partitions 1 –topic hello-topic
• 查看topic kafka-topics.sh –list –zookeeper bigdata:2181
• 发送消息 kafka-console-producer.sh –broker-list bigdata:9092 –topic hello-topic
• 消费消息 kafka-console-consumer.sh –zookeeper bigdata:2181 –topic hello-topic –from-beginning
• 查看所有topic详细信息 kafka-topics.sh –describe –zookeeper bigdata:2181
• 查看指定topic详细信息 kafka-topics.sh –describe –zookeeper bigdata:2181 –toptic hello-topic
单节点多Broker部署及使用
• 复制server.properties server-1.properties/server-2.properties
• 配置server-1.properties/server-2.properties
• broker.id=1/2
• listeners 9093/9094
• log.dir
• 启动kafka kafka-server-start.sh -daemon ../config/server-1.properties & kafka-server-start.sh -daemon .. /config/server-2.properties &
• 创建topic
• kafka-topics.sh –create –zookeeper bigdata:2181 –replication-factor 2 –partitions 1 –topic my-replic
• 查看topic kafka-topics.sh –list –zookeeper bigdata:2181
• 发送消息 kafka-console-producer.sh –broker-list bigdata:9093,bigdata:9094 –topic my-replic
• 消费消息 kafka-console-consumer.sh –zookeeper bigdata:2181 –topic my-replic
• 查看所有topic详细信息 kafka-topics.sh –describe –zookeeper bigdata:2181
多节点多Broker部署及使用 自己动手实践
KAFKA 容错性测试
当有多个副本时,无论leader挂掉,还是副节点挂掉,均不影响消息的正常发送,当leader挂掉时,会默认从活的节点中选取一个节点为leader节点
KAFAK API编程
开发环境
• IDEA 17版本
• kafka 1.1.0版本
• scala 2_11版本
配置文件
1.public class KafakProperties {
2. public static final String ZK="bigdata:2181";
3. public static final String TOPIC="hello_topic";
4. public static final String BROKER_LIST="bigdata:9092,bigdata:9093,bigdata:9094";
5. public static final String GROUP_ID ="test";
6.}
生产者
1.import org.apache.kafka.clients.producer.KafkaProducer;
2.import org.apache.kafka.clients.producer.ProducerRecord;
3.
4.import java.util.Properties;
5.
6./**
7. * Auth: mbw
8. * ProjectName:test
9. * CreateTime:2018/4/18 下午8:43
10. * Des:kafka 生产者
11. */
12.public class KafkaProducers extends Thread{
13. private String topic;
14. private KafkaProducer<Integer,String> producer;
15.
16.
17. public KafkaProducers(String topic){
18. this.topic=topic;
19. Properties properties=new Properties();
20. // 配置kafka集群的broker地址,建议配置两个以上,以免其中一个失效,但不需要配全,集群会自动查找leader节点。
21. properties.put("bootstrap.servers",KafakProperties.BROKER_LIST);
22. properties.put("acks","1");
23. properties.put("retries ", 1);//失败是否重试,设置1会有可能产生重复数据
24. properties.put("buffer.memory", 33554432);//整个producer可以用于buffer的内存大小
25. properties.put("linger.ms", 1); //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满
26. properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
27. properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
28. producer = new KafkaProducer(properties);
29. }
30.
31. @Override
32. public void run() {
33. int messageNo=0;
34. while(true){
35. String message= "sendMessage"+messageNo;
36.
37. producer.send(new ProducerRecord(topic,message));
38. System.out.println("Sent:" + message);
39. messageNo++;
40. try {
41. Thread.sleep(2000);
42. }catch (Exception e){
43. e.printStackTrace();
44. }
45. }
46. }
47.}
48.
消费者
1.import org.apache.kafka.clients.consumer.ConsumerRecord;
2.import org.apache.kafka.clients.consumer.ConsumerRecords;
3.import org.apache.kafka.clients.consumer.KafkaConsumer;
4.import org.apache.kafka.common.TopicPartition;
5.
6.import java.util.Arrays;
7.import java.util.List;
8.import java.util.Properties;
9.
10./**
11. * Auth: mbw
12. * ProjectName:test
13. * CreateTime:2018/4/19 下午8:57
14. * Desc:kafka消费者
15. */
16.public class KafkaConsumers extends Thread {
17. private String topic;
18. private KafkaConsumer kafkaConsumer;
19. public KafkaConsumers(String topic){
20. this.topic=topic;
21. Properties properties =new Properties();
22. properties.put("bootstrap.servers", KafakProperties.BROKER_LIST);
23. //不同ID 可以同时订阅消息
24. properties.put("group.id", KafakProperties.GROUP_ID);
25. properties.put("enable.auto.commit", "false"); //自动commit
26. properties.put("auto.commit.interval.ms", "1000"); //定时commit的周期
27. properties.put("session.timeout.ms", "30000"); //consumer活性超时时间
28. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
29. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
30.
31. kafkaConsumer= new KafkaConsumer(properties);
32. kafkaConsumer.subscribe(Arrays.asList(topic));//订阅TOPIC
33. }
34.
35. @Override
36. public void run() {
37. try {
38. while(true) {//轮询
39. ConsumerRecords<String, String> records =kafkaConsumer.poll(Long.MAX_VALUE);//超时等待时间
40. for (TopicPartition partition : records.partitions()) {
41. List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
42. for (ConsumerRecord<String, String> record : partitionRecords) {
43. System.out.println(record.offset() + ": " + record.value());
44. }
45. kafkaConsumer.commitSync();//同步
46. }
47. }
48. } finally {
49. kafkaConsumer.close();
50. }
51. }
52.}
53.
测试类
1./**
2. * Auth: mbw
3. * ProjectName:test
4. * CreateTime:2018/4/19 下午9:37
5. * Desc:kafka java test
6. */
7.public class KafkaClientApp {
8. public static void main(String[] args) {
9. new KafkaProducers(KafakProperties.TOPIC).start();
10.
11. new KafkaConsumers(KafakProperties.TOPIC).start();
12. }
13.}
KAFKA 实战
实战架构图:整合Flume到Kafka完成实时数据的采集,通过使用Flume从web服务器收集日志文件到Kafka

Flume 配置文件
配置1:
1.exec-memory-avro.conf
2.exec-memory-avro.sources = exec-source
3.exec-memory-avro.sinks = avro-sink
4.exec-memory-avro.channels = memory-channel
5.
6.# Describe/configure the source
7.exec-memory-avro.sources.exec-source.type = exec
8.exec-memory-avro.sources.exec-source.command=tail -f /app/word_count.txt
9.exec-memory-avro.sources.exec-source.shell=/bin/sh -c
10.
11.# Describe the sink
12.exec-memory-avro.sinks.avro-sink.type = avro
13.exec-memory-avro.sinks.avro-sink.hostname = bigdata
14.exec-memory-avro.sinks.avro-sink.port =44444
15.
16.# Use a channel which buffers events in memory
17.exec-memory-avro.channels.memory-channel.type = memory
18.exec-memory-avro.channels.memory-channel.capacity = 1000
19.exec-memory-avro.channels.memory-channel.transactionCapacity = 100
20.
21.# Bind the source and sink to the channel
22.exec-memory-avro.sources.exec-source.channels =memory-channel
23.exec-memory-avro.sinks.avro-sink.channel = memory-channel
启动Flume Agent命令 flume-ng agent –name exec-memory-avro –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console
配置2:
1.avro-memory-kafka.conf
2.
3.avro-memory-kafka.sources = avro-source
4.avro-memory-kafka.sinks = kafka-sink
5.avro-memory-kafka.channels = memory-channel
6.
7.# Describe/configure the source
8.avro-memory-kafka.sources.avro-source.type = avro
9.avro-memory-kafka.sources.avro-source.bind=bigdata
10.avro-memory-kafka.sources.avro-source.port=44444
11.# Describe the sink
12.avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
13.avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers= bigdata:9092,bigdata:9093,bigdata:9094
14.avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic
15.avro-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 20
16.avro-memory-kafka.sinks.kafka-sink.kafka.producer.acks = 1
17.avro-memory-kafka.sinks.kafka-sink.kafka.producer.linger.ms = 1
18.avro-memory-kafka.sinks.kafka-sink.kafka.producer.compression.type = snappy
19.
20.# Use a channel which buffers events in memory
21.avro-memory-kafka.channels.memory-channel.type = memory
22.avro-memory-kafka.channels.memory-channel.capacity = 1000
23.avro-memory-kafka.channels.memory-channel.transactionCapacity = 100
24.
25.# Bind the source and sink to the channel
26.avro-memory-kafka.sources.avro-source.channels = memory-channel
27.avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
28.
启动Flume Agent命令 flume-ng agent –name avro-memory-kafka –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
启动顺序
• 启动avro-memory-kafka
• 启动exec-memory-avro
• 启动kafka 消费客户端(启动命令见kafka执行命令)
运行结果: