zoukankan      html  css  js  c++  java
  • Kafka部署篇

    安装

    下载与安装

    kafka下载地址:https://kafka.apache.org/downloads
    需要说明的是,kafka的安装依赖于zk,zk的部署可直接参考《Zookeeper介绍与基本部署》。当然,kafka默认也内置了zk的启动脚本,在kafka安装路径的bin目录下,名称为zookeeper-server-start.sh,如果不想独立安装zk,可直接使用该脚本。

    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
    tar xf kafka_2.12-2.2.0.tgz -C /usr/local/
    cd /usr/local
    ln -s kafka_2.12-2.2.0 kafka
    

    配置

    kafka主配置文件为/usr/local/kafka/config/server.properties,配置示例如下:

    broker.id=0
    listeners=PLAINTEXT://10.1.60.29:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/logs
    num.partitions=3
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.1.60.29:2181,10.1.61.195:2181,10.1.61.27:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    auto.create.topics.enable=true
    delete.topics.enable=true
    

    配置说明:

    • broker.id:每个broker在集群中的唯一标识,正整数。当该服务器的ip地址发生变更,但broker.id未变,则不会影响consumers的消费情况
    • listeners:kafka的监听地址与端口,在实际测试中如果写0.0.0.0会报错。
    • num.network.threads:kafka用于处理网络请求的线程数
    • num.io.threads:kafka用于处理磁盘io的线程数
    • socket.send.buffer.bytes:发送数据的缓冲区
    • socket.receive.buffer.bytes:接收数据的缓冲区
    • socket.request.max.bytes:允许接收的最大数据包的大小(防止数据包过大导致OOM)
    • log.dirs:kakfa用于保存数据的目录,所有的消息都会存储在该目录当中。可以通过逗号来指定多个路径,kafka会根据最少被使用的原则选择目录分配新的partition。需要说明的是,kafka在分配partition的时候选择的原则不是按照磁盘空间大小来定的,而是根据分配的partition的个数多少而定
    • num.partitions:设置新创建的topic的默认分区数
    • number.recovery.threads.per.data.dir:用于恢复每个数据目录时启动的线程数
    • log.retention.hours:配置kafka中消息保存的时间,还支持log.retention.minutes和log.retention.ms。如果多个同时设置会选择时间最短的配置,默认为7天。
    • log.retention.check.interval.ms:用于检测数据过期的周期
    • log.segment.bytes:配置partition中每个segment数据文件的大小。默认为1GB。超出该大小后,会自动创建一个新的segment文件。
    • zookeeper.connect:指定连接的zk的地址,zk中存储了broker的元数据信息。可以通过逗号来设置多个值。格式为:hostname:port/path。hostname为zk的主机名或ip,port为zk监听的端口。/path表示kafka的元数据存储到zk上的目录,如果不设置,默认为根目录
    • zookeeper.connection.timeout:kafka连接zk的超时时间
    • group.initial.rebalance.delay.ms:在实际环境当中,当将多个consumer加入到一个空的consumer group中时,每加入一个consumer就会触发一次对partition消费的重平衡,如果加入100个,就得重平衡100次,这个过程就会变得非常耗时。通过设置该参数,可以延迟重平衡的时间,比如有100个consumer会在10s内全部加入到一个consumer group中,就可以将该值设置为10s,10s之后,只需要做一次重平衡即可。默认为0则代表不开启该特性。
    • auto.create.topics.enable:当有producer向一个不存在的topic中写入消息时,是否自动创建该topic
    • delete.topics.enable:kafka提供了删除topic的功能,但默认并不会直接将topic数据物理删除。如果要从物理上删除(删除topic后,数据文件也一并删除),则需要将此项设置为true

    需要说明的是,多个kafka节点依赖zk实现集群,所以各节点并不需要作特殊配置,只需要broker.id不同,并接入到同一个zk集群即可。

    启停操作

    #启动
    /usr/local/kafka/bin/kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties 
    
    #检查java进程
    
    # jps
    1394 QuorumPeerMain
    13586 Logstash
    27591 Kafka
    27693 Jps
    
    #停止
    /usr/local/kafka/bin/kafka-server-start.sh
    
    

    验证

    可以通过zookeeper查看kafka的元数据信息:

    #通过zk客户端连接zookeeper
    
    ../zookeeper/bin/zkCli.sh 
    
    #查看根下多了很多目录
    [zk: localhost:2181(CONNECTED) 1] ls /
    [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    
    #查看/brokers/ids,可以看到有三个broker已经加入
    [zk: localhost:2181(CONNECTED) 8] ls /brokers/ids
    [0, 1, 2]
    
    #查看/brokers/topics,目前为空,说明还没有创建任何的topic
    [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
    []
    

    基本操作

    创建topic

    上面完成了kafka的部署,通过验证部署我们发现当前没有topic,所以创建一个topic如下:

    # ./bin/kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 2 --partitions 3 --topic myfirsttopic 
    Created topic myfirsttopic.
    

    参数说明:

    • --create:创建一个topic
    • --zookeeper: 指定zookeeper集群的主机列表,多个server可使用逗号分隔,这里因为kafka和zk是在同一个server,所以直接连接了本机的2181端口
    • --replication-factor:指定创建这个topic的副本数
    • --partitions:指定该topic的分区数
    • --topic:指定topic的名称

    列出现有的topic

    上面通过操作zk就可以看到topic相关信息,接下来我们直接通过kafka命令行来进行相关操作:

    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list                                                               
    myfirsttopic
    

    查看topic的详细信息

    #查看myfirsttopic的详细信息
    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
    Topic:myfirsttopic      PartitionCount:3        ReplicationFactor:2     Configs:
            Topic: myfirsttopic     Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
            Topic: myfirsttopic     Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
            Topic: myfirsttopic     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
    

    参数说明:

    • --describe:查看topic的详细信息

    输出说明:

    • leader:当前负责读写的leader broker
    • replicas:当前分区所有的副本对应的broker列表
    • isr:处于活动状态的broker

    增加topic的partition数量

    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    
    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic            
    Topic:myfirsttopic      PartitionCount:6        ReplicationFactor:2     Configs:
            Topic: myfirsttopic     Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
            Topic: myfirsttopic     Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
            Topic: myfirsttopic     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
            Topic: myfirsttopic     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 0,2
            Topic: myfirsttopic     Partition: 4    Leader: 1       Replicas: 1,0   Isr: 1,0
            Topic: myfirsttopic     Partition: 5    Leader: 2       Replicas: 2,1   Isr: 2,1
    

    修改一个topic的副本数

    #创建一个topic名为mysecondtopic,指定分区为2,副本为1
    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
    Created topic mysecondtopic.
    
    #查看新创建的topic详细信息
    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic
    Topic:mysecondtopic     PartitionCount:2        ReplicationFactor:1     Configs:
            Topic: mysecondtopic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
            Topic: mysecondtopic    Partition: 1    Leader: 1       Replicas: 1     Isr: 1
    
    #将broker.id为0上的partition的副本由原来的[0]扩充为[0,2],将broker.id为1上的partition的副本由原来的[1]扩充为[1,2]。
    #需要先创建一个json文件如下:
    # cat partitions-to-move.json
    {
        "partitions":
        [
            {
                "topic":"mysecondtopic",
                "partition": 0,
                "replicas": [0,2]
            },
            {
                "topic": "mysecondtopic",
                "partition": 1,
                "replicas": [1,2]
            }
        ],
        "version": 1
    }
    
    #执行副本修改
    # ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute
    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.
    
    #再次查看topic状态,发现副本数由按照预期发生变更
    # ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic
    Topic:mysecondtopic     PartitionCount:2        ReplicationFactor:2     Configs:
            Topic: mysecondtopic    Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
            Topic: mysecondtopic    Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1
    
    

    删除一个topic

    #执行删除操作
    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic
    Topic myfirsttopic is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    
    #查看topic,可以看到myfirsttopic已被删除
    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
    __consumer_offsets
    mysecondtopic
    

    通过producer生产消息

    # ./bin/kafka-console-producer.sh  --broker-list 10.1.60.29:9092 --topic mysecondtopic                                                
    >hello kafka!
    >hello world!
    >just a test!
    >
    >hi world!
    >hahahaha!
    >
    

    通过consumer消费消息

    # ./bin/kafka-console-consumer.sh --bootstrap-server 10.1.60.29:9092 --topic mysecondtopic --from-beginning       
    hello kafka!
    just a test!
    hi world!
    hello world!
    
    hahahaha!
    
  • 相关阅读:
    Redmine-java-api使用
    Junit4 基于 custom Rule retry
    Extentreports在安卓中的应用
    uiautomator 2.0 自定义testrunner使用
    uiautomator 启动原理
    uiautomator 自定义注解的应用
    uiautomator 原理 (UiAutomation、UiAutomatorBridge、QueryController)
    uiautomator 自定义testrunner使用和启动原理
    HeadFirstPython-文件与异常
    HeadFirstPython-初识python
  • 原文地址:https://www.cnblogs.com/breezey/p/10850855.html
Copyright © 2011-2022 走看看