zoukankan      html  css  js  c++  java
  • kafka搭建笔记

    环境CentOS7.0,JDK1.8

    一、下载安装

    在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包

    下载 2.0.0 release,解压.
    > tar -xzf kafka_2.11-2.0.0.tgz
    > cd kafka_2.11-2.0.0

    在kafka解压目录下下有一个config的文件夹,里面放置的是配置文件

      consumer.properites 消费者配置,可以使用默认设置

      producer.properties 生产者配置,可以使用默认设置

      server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,常用的配置

    broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,这里采用默认配置即可
            
    listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:
    listeners=PLAINTEXT:// 192.168.10.1:9092。并确保服务器的9092端口能够访问
    
    zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,如果使用自带的zookeeper版本,使用默认配置即可zookeeper.connect=localhost:2181

    二、启动kafka服务

    启动kafka前需要先启动zookeeper,如果设备上没有安装,可以使用kafka安装包里自带的zookeeper,这是一个单节点便捷版zookeeper。

    启动zookeeper...
    > bin/zookeeper-server-start.sh config/zookeeper.properties
    [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    ...
    启动kafka服务
    > bin/kafka-server-start.sh config/server.properties
    [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    ...

    三、创建一个topic

    Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息。

    物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。

    创建一个名为test的topic,只有一个分区,一份副本(即除了自己没有其他副本)

    创建topic
    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    查看运行的topic
    > bin/kafka-topics.sh --list --zookeeper localhost:2181
    test

    四、创建一个消息的生产者

    Producer,指消息的产生者,或者,消息的写端,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic

    kafka包里提供了控制台的消息生产方式

    创建对kafka服务(正在侦听9092端口,在server.properties里配置)的test主题消息生产者
    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message

    五、创建一个消息的消费者

    Consumer,指消息的消费者,或者,消息的读端,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取(pull方式)消息,然后可以对这些消息进行处理。

    Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

    kafka包里提供了控制台的消息消费方式

    创建消息消费者
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message

    到这里,一个简单的kafka的服务搭建完成

    六、kafka集群

    在同一台设备搭建一个kafka集群,主要给kafka服务定义不同侦听端口就可以。

    先复制两份服务配置文件
    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties
    然后修改两个配置文件以下节点
    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dirs=/tmp/kafka-logs-1
     
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dirs=/tmp/kafka-logs-2

    broker,指消息服务器,Producer 产生的消息都是写到这里,Consumer 读消息也是从这里读

    broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的

    确保zookeeper运行,并且之前启动的单节点kafka服务也确保运行着,这样,我们就有三个kafka服务。

    启动server-1和server-2
    > bin/kafka-server-start.sh config/server-1.properties 
    ...
    > bin/kafka-server-start.sh config/server-2.properties
    ...

    创建一个使用三份副本的topic

    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    这个时候我们可以看看这几个broke都干啥,使用kafka-topics的--describe参数

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    • "leader" 简单的说就是主节点,读写都是它负责。
    • "replicas" 副本列表,表示副本分布在哪些代理上,且该列表第一个元素就是Leader副本所在代理
    • "isr" 副本子集,是指消息同步的集合,这个列表的副本都是存活的

    现在可以试试发送和接收消息:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2

    七、测试一下集群的容错性

    现在leader是broke 1,我们找到它,然后杀死进程

    > ps aux | grep server-1.properties
    7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
    > kill -9 7564

    然后通过参数--describe看看变化

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

    leader已经变成broke 2,并且Isr里broke 1已经被排除,也就是数据不同步到broke 1

    之前写入的消息依然有效

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2

    然后,我们再重新启动server-1,并且查看状态

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0,1

    broke 1又回到同步列表里。

     八、常用命令

    Kafka bin/ 目录下的工具:	
    bin/connect-distributed.sh     bin/kafka-consumer-offset-checker.sh     bin/kafka-replica-verification.sh   bin/kafka-verifiable-producer.sh
    bin/connect-standalone.sh      bin/kafka-consumer-perf-test.sh          bin/kafka-run-class.sh              bin/zookeeper-security-migration.sh
    bin/kafka-acls.sh              bin/kafka-mirror-maker.sh                bin/kafka-server-start.sh           bin/zookeeper-server-start.sh
    bin/kafka-configs.sh           bin/kafka-preferred-replica-election.sh  bin/kafka-server-stop.sh            bin/zookeeper-server-stop.sh
    bin/kafka-console-consumer.sh  bin/kafka-producer-perf-test.sh          bin/kafka-simple-consumer-shell.sh  bin/zookeeper-shell.sh
    bin/kafka-console-producer.sh  bin/kafka-reassign-partitions.sh         bin/kafka-topics.sh
    bin/kafka-consumer-groups.sh   bin/kafka-replay-log-producer.sh         bin/kafka-verifiable-consumer.sh
    
    常用的命令有以下几个:
    
    bin/kafka-server-start.sh -daemon config/server.properties
    bin/kafka-topics.sh --describe --zookeeper 192.168.10.31:2181 --topic topic1
    bin/kafka-topics.sh --list --zookeeper 192.168.10.31:2181
    bin/kafka-topics.sh --delete --zookeeper 192.168.10.31:2181 --topic topic1
    bin/kafka-topics.sh --create --zookeeper 192.168.10.31:2181 --replication-factor 3 --partitions 2 --topic topic1
    bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.31:9092 --topic topic1 --from-beginning
    bin/kafka-console-producer.sh --broker-list 192.168.10.31:9092 --topic topic1
    删除主题:
    在server.properties配置
    delete.topic.enable=true #默认是false
    
    删除主题数据:
    在server.properties中配置
    log.retention.hours=168  //保留时间,单位小时
    log.retention.check.interval.ms=300000 //保留时间检查间隔,单位毫秒

    九、其他

    如果想继续运行其他命令,不关闭当前窗口,可以加“&“,就可以后台运行了:

    bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 相关阅读:
    结对项目:四则运算
    Word Count(C语言)
    自我介绍+软工5问
    如何在博客园中使用markdown
    如何设计一门语言(九)——类型
    如何设计一门语言(八)——异步编程和CPS变换
    如何设计一门语言(七)——闭包、lambda和interface
    时隔多年我又再一次体验了一把跟大神聊天的感觉
    20199107 2019-2020-2 《网络攻防实践》第4周作业
    20199107 2019-2020-2 《网络攻防实践》第3周作业
  • 原文地址:https://www.cnblogs.com/asker009/p/9958240.html
Copyright © 2011-2022 走看看