zoukankan      html  css  js  c++  java
  • Kafka的集群部署与基本的命令行操作

      KafKa集群是依赖于zookeeper的,所以想要部署一个分布式的KafKa集群,首先需要搭建一个Zookeeper集群,至于如何搭建zookeeper集群,在我的另一篇博文里已经说过了,这里不再讲述。安装好zookeeper集群以后,部署KafKa集群就简单多了,只需要相应的的Jar包即可。Jar包可以在KafKa官网下载,网址如下:http://kafka.apache.org/downloads.html

     我用的版本是0.11.0的,下载的地址如下:https://t00y.com/file/28012292-455761212

     上传到集群中的某个服务器之后,解压缩:

    $ tar zxvf kafka_2.11 0.11.0.0.tgz -C /opt/module

     在Kafka的安装目录下,创建一个文件夹,用于存放Topic中的消息(Topic中的消息默认是会存放在Kafka本地7天的),:

    mkdir data

     进入到KafKa的conf文件夹,里面都是相关的配置文件,其中最重要的就是server.properties文件,里面包含了与KafKa集群相关的配置,现在就来修改一下相关的配置:

    # The id of the broker. This must be set to a unique integer for each broker.
    # 这是broker的全局唯一编号,不能重复,全局是指整个集群
    broker.id=0
    
    # Switch to enable topic deletion or not, default value is false
    # 将这个参数设置为true,否则将不能真正删除topic
    delete.topic.enable=true
    
    ############################# Socket Server Settings #############################
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    advertised.listeners=PLAINTEXT://192.168.182.101:9092
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    ############################# Log Basics #############################
    
    # A comma seperated list of directories under which to store log files
    # topic消息的存放地址
    log.dirs=/opt/module/kafka/data
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    #log.flush.interval.ms=1000
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    # 配置连接zookeeper集群的地址
    zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181
                            ……

      配置环境变量,vi  /etc/profile

    #KAFKA_HOME
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

      修改完,别忘记:source  /etc/profile

      最后分发一下kafka这个文件夹到集群的各个节点,然后一定要修改下各个节点的server.properties文件的broker.id这个属性值,不能全局重复!!! 环境变量也要配置!!!

      启动集群的方式,就是分别执行各个节点的KafKa下的bin文件夹下的一个kafka-server-start.sh文件,同时加上一些命令行参数,如下:

    $ bin/kafka-server-start.sh -daemon config/server.properties

      关闭集群,同样是执行另一个文件:

    $ bin/kafka-server-stop.sh stop

      上面的方式一个节点一个节点的启动整个集群,太麻烦了,可以将上述的启动和关闭集群的命令写成一个Shell脚本来实现,整个集群的启动和关闭。具体可以见我另一篇博文。

    Kafka命令行操作

      与topic相关的增删改查命令都包含在kafka-topics.sh文件中

    1)查看当前服务器中的所有topic

    $ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list

    2)创建topic

    $ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first

    选项说明:

    --topic 定义topic名

    --replication-factor  定义副本数

    --partitions  定义分区数

      创建的topic暂存于指定的路径下,其就是在server.properties文件中的log.dirs属性指定的路径,topic存储时的命名方式为:topic名字+分区名

       创建topic时,其中分区的副本数应小于等于broker的数量,分区数可以大于broker的数量。

    3)删除topic

    $ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

    需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

    4)发送消息

    $ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

    输入命令之后,进入阻塞状态,可以输入要发送的消息:

    >hello world

    >atguigu  atguigu

    5)消费消息

    $ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first

    --from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

    消费者启动的时候,会生成一个专门用于记录offset信息的主题,该主题默认为50个分区,这些文件夹的名称为 "__consumer_offsets-n",n代表分区号。

    6)查看某个Topic的详情

    $ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

    7)修改分区数

    $bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
  • 相关阅读:
    Code::blocks 使用c++ long double类型出错
    数论四大定理
    线性基
    win7 下强制删除文件
    C++数组指针的引用
    学习方法
    MySQL的ON DUPLICATE KEY UPDATE用法
    MVCC
    RabbitMQ中的Connection 和 Channel
    myisam和innodb的比较
  • 原文地址:https://www.cnblogs.com/yxym2016/p/13410452.html
Copyright © 2011-2022 走看看