kafka+zookeeper+Flume实现流处理基础

浏览: 1000

KAKFA

    •    KAKFA

    ◦    分布式消息队列

    ▪    KAFKA概述

    ▪    KAFKA 的架构及核心概念

    ▪    KAFKA 的部署及使用

    ▪    KAFKA 容错性测试

    ▪    KAFAK API编程

    ▪    KAFKA 实战

分布式消息队列

KAFKA概述

官网: kafka 官方网站

和消息系统类似 
消息中间件:生产者和消费者

妈妈:生产者 
你:消费者 
馒头:数据流

KAFKA 的架构及核心概念

Kafka 架构

    •    Producer:生产者,就是生产馒头(老妈)

    •    Consumer:消费者,就是吃馒头的(你)

    •    broker:篮子

    •    topic:主题

KAFKA 的部署及使用

zookeeper 配置

    •    官网下载安装包

    •    解压压缩包

    •    配置~/.bash_profile 文件

    •    source ~/.base_profile 使配置文件生效

    •    配置zoo.cfg 文件

    •    将dataDir默认路径进行修改为自定义路径

    •    启动zookeeper服务端:sh zkSever.sh start

    •    启动zookeeper客户端:sh zkCli.sh

Kafka部署

    •    官网下载安装包

    •    解压压缩包

    •    配置~/.bash_profile 文件

    •    source ~/.base_profile 使配置文件生效

服务架构

单节点单Broker部署及使用

    •    配置server.properties

    •    broker.id=0

    •    listeners

    •    log.dir

    •    zookeeper.connect

    •    启动kafka 
kafka-server-start.sh 
执行上述命令会提示一下信息(提示正确的启动命令后需跟参数):USAGE: kafka-server-start.sh [-daemon] server.properties [–override property=value]* 
kafka-server-start.sh $KAKA_HOME/config/server.properties

    •    创建topic 
kafka-topics.sh –create –zookeeper bigdata:2181 –replication-factor 1 –partitions 1 –topic hello-topic

    •    查看topic 
kafka-topics.sh –list –zookeeper bigdata:2181

    •    发送消息 
kafka-console-producer.sh –broker-list bigdata:9092 –topic hello-topic

    •    消费消息 
kafka-console-consumer.sh –zookeeper bigdata:2181 –topic hello-topic –from-beginning

    •    查看所有topic详细信息 
kafka-topics.sh –describe –zookeeper bigdata:2181

    •    查看指定topic详细信息 
kafka-topics.sh –describe –zookeeper bigdata:2181 –toptic hello-topic

单节点多Broker部署及使用

    •    复制server.properties server-1.properties/server-2.properties

    •    配置server-1.properties/server-2.properties

    •    broker.id=1/2

    •    listeners 9093/9094

    •    log.dir

    •    启动kafka 
kafka-server-start.sh -daemon ../config/server-1.properties & 
kafka-server-start.sh -daemon .. /config/server-2.properties &

    •    创建topic

    •    kafka-topics.sh –create –zookeeper bigdata:2181 –replication-factor 2 –partitions 1 –topic my-replic

    •    查看topic 
kafka-topics.sh –list –zookeeper bigdata:2181

    •    发送消息 
kafka-console-producer.sh –broker-list bigdata:9093,bigdata:9094 –topic my-replic

    •    消费消息 
kafka-console-consumer.sh –zookeeper bigdata:2181 –topic my-replic

    •    查看所有topic详细信息 
kafka-topics.sh –describe –zookeeper bigdata:2181

多节点多Broker部署及使用 
自己动手实践

KAFKA 容错性测试

当有多个副本时,无论leader挂掉,还是副节点挂掉,均不影响消息的正常发送,当leader挂掉时,会默认从活的节点中选取一个节点为leader节点

KAFAK API编程

开发环境

    •    IDEA 17版本

    •    kafka 1.1.0版本

    •    scala 2_11版本

配置文件

1.public class KafakProperties {

2.    public static final String ZK="bigdata:2181";

3.    public static final String TOPIC="hello_topic";

4.    public static final String BROKER_LIST="bigdata:9092,bigdata:9093,bigdata:9094";

5.    public static final String GROUP_ID ="test";

6.}

生产者

1.import org.apache.kafka.clients.producer.KafkaProducer;

2.import org.apache.kafka.clients.producer.ProducerRecord;

3.

4.import java.util.Properties;

5.

6./**

7. * Auth: mbw

8. * ProjectName:test

9. * CreateTime:2018/4/18 下午8:43

10. * Des:kafka 生产者

11. */

12.public class KafkaProducers extends Thread{

13.    private String topic;

14.    private KafkaProducer<Integer,String> producer;

15.

16.

17.    public KafkaProducers(String topic){

18.        this.topic=topic;

19.        Properties properties=new Properties();

20.        // 配置kafka集群的broker地址,建议配置两个以上,以免其中一个失效,但不需要配全,集群会自动查找leader节点。

21.        properties.put("bootstrap.servers",KafakProperties.BROKER_LIST);

22.        properties.put("acks","1");

23.        properties.put("retries ", 1);//失败是否重试,设置1会有可能产生重复数据

24.        properties.put("buffer.memory", 33554432);//整个producer可以用于buffer的内存大小

25.        properties.put("linger.ms", 1);  //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满

26.        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

27.        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

28.        producer = new KafkaProducer(properties);

29.    }

30.

31.    @Override

32.    public void run() {

33.        int messageNo=0;

34.        while(true){

35.            String message= "sendMessage"+messageNo;

36.

37.            producer.send(new ProducerRecord(topic,message));

38.            System.out.println("Sent:" + message);

39.            messageNo++;

40.            try {

41.                Thread.sleep(2000);

42.            }catch (Exception e){

43.                e.printStackTrace();

44.            }

45.        }

46.    }

47.}

48.

消费者

1.import org.apache.kafka.clients.consumer.ConsumerRecord;

2.import org.apache.kafka.clients.consumer.ConsumerRecords;

3.import org.apache.kafka.clients.consumer.KafkaConsumer;

4.import org.apache.kafka.common.TopicPartition;

5.

6.import java.util.Arrays;

7.import java.util.List;

8.import java.util.Properties;

9.

10./**

11. * Auth: mbw

12. * ProjectName:test

13. * CreateTime:2018/4/19 下午8:57

14. * Desc:kafka消费者

15. */

16.public class KafkaConsumers extends Thread {

17.    private String topic;

18.    private KafkaConsumer kafkaConsumer;

19.    public KafkaConsumers(String topic){

20.        this.topic=topic;

21.        Properties properties =new Properties();

22.        properties.put("bootstrap.servers", KafakProperties.BROKER_LIST);

23.        //不同ID 可以同时订阅消息

24.        properties.put("group.id", KafakProperties.GROUP_ID);

25.        properties.put("enable.auto.commit", "false");  //自动commit

26.        properties.put("auto.commit.interval.ms", "1000"); //定时commit的周期

27.        properties.put("session.timeout.ms", "30000"); //consumer活性超时时间

28.        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

29.        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

30.

31.        kafkaConsumer= new KafkaConsumer(properties);

32.        kafkaConsumer.subscribe(Arrays.asList(topic));//订阅TOPIC

33.    }

34.

35.    @Override

36.    public void run() {

37.        try {

38.            while(true) {//轮询

39.                ConsumerRecords<String, String> records =kafkaConsumer.poll(Long.MAX_VALUE);//超时等待时间

40.                for (TopicPartition partition : records.partitions()) {

41.                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

42.                    for (ConsumerRecord<String, String> record : partitionRecords) {

43.                        System.out.println(record.offset() + ": " + record.value());

44.                    }

45.                    kafkaConsumer.commitSync();//同步

46.                }

47.            }

48.        } finally {

49.            kafkaConsumer.close();

50.        }

51.    }

52.}

53.

测试类

1./**

2. * Auth: mbw

3. * ProjectName:test

4. * CreateTime:2018/4/19 下午9:37

5. * Desc:kafka  java test

6. */

7.public class KafkaClientApp {

8.    public static void main(String[] args) {

9.        new KafkaProducers(KafakProperties.TOPIC).start();

10.

11.        new KafkaConsumers(KafakProperties.TOPIC).start();

12.    }

13.}

KAFKA 实战

实战架构图:整合Flume到Kafka完成实时数据的采集,通过使用Flume从web服务器收集日志文件到Kafka

Flume 配置文件

配置1:

1.exec-memory-avro.conf

2.exec-memory-avro.sources = exec-source  

3.exec-memory-avro.sinks = avro-sink

4.exec-memory-avro.channels = memory-channel

5.

6.# Describe/configure the source

7.exec-memory-avro.sources.exec-source.type = exec

8.exec-memory-avro.sources.exec-source.command=tail -f /app/word_count.txt

9.exec-memory-avro.sources.exec-source.shell=/bin/sh -c

10.

11.# Describe the sink

12.exec-memory-avro.sinks.avro-sink.type = avro

13.exec-memory-avro.sinks.avro-sink.hostname = bigdata

14.exec-memory-avro.sinks.avro-sink.port =44444

15.

16.# Use a channel which buffers events in memory

17.exec-memory-avro.channels.memory-channel.type = memory

18.exec-memory-avro.channels.memory-channel.capacity = 1000

19.exec-memory-avro.channels.memory-channel.transactionCapacity = 100

20.

21.# Bind the source and sink to the channel

22.exec-memory-avro.sources.exec-source.channels =memory-channel

23.exec-memory-avro.sinks.avro-sink.channel = memory-channel

启动Flume Agent命令 
flume-ng agent –name exec-memory-avro –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console

配置2:

1.avro-memory-kafka.conf

2.

3.avro-memory-kafka.sources = avro-source  

4.avro-memory-kafka.sinks = kafka-sink

5.avro-memory-kafka.channels = memory-channel

6.

7.# Describe/configure the source

8.avro-memory-kafka.sources.avro-source.type = avro

9.avro-memory-kafka.sources.avro-source.bind=bigdata

10.avro-memory-kafka.sources.avro-source.port=44444

11.# Describe the sink

12.avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink

13.avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers= bigdata:9092,bigdata:9093,bigdata:9094

14.avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic

15.avro-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 20

16.avro-memory-kafka.sinks.kafka-sink.kafka.producer.acks = 1

17.avro-memory-kafka.sinks.kafka-sink.kafka.producer.linger.ms = 1

18.avro-memory-kafka.sinks.kafka-sink.kafka.producer.compression.type = snappy

19.

20.# Use a channel which buffers events in memory

21.avro-memory-kafka.channels.memory-channel.type = memory

22.avro-memory-kafka.channels.memory-channel.capacity = 1000

23.avro-memory-kafka.channels.memory-channel.transactionCapacity = 100

24.

25.# Bind the source and sink to the channel

26.avro-memory-kafka.sources.avro-source.channels = memory-channel

27.avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

28.

启动Flume Agent命令 
flume-ng agent –name avro-memory-kafka –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console

启动顺序

    •    启动avro-memory-kafka

    •    启动exec-memory-avro

    •    启动kafka 消费客户端(启动命令见kafka执行命令)

运行结果:

推荐 0
本文由 mbw 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册