zoukankan      html  css  js  c++  java
  • kafka集群与zookeeper集群 配置过程

    Kafka的集群配置一般有三种方法,即

        (1)Single node – single broker集群;

        (2)Single node – multiple broker集群;
        (3)Multiple node – multiple broker集群。

        前两种方法官网上有配置过程((1)(2)配置方法官网教程),下面会简单介绍前两种方法,主要介绍最后一种方法。

    准备工作:

        1.Kafka的压缩包,这里选用的是kafka_2.10-0.8.2.2.tgz

        2.三台CentOS 6.4 64位虚拟机。分别是192.168.121.34(主机名为master)、192.168.121.35(主机名为datanode1)、192.168.121.36(主       机名为datanode2)。

    一、Single node – single broker集群配置(单节点单boker集群配置)

      注:图片来源自网络

    1.解压Kafka的压缩包

        [root@master kafkainstall]# tar -xzf kafka_2.10-0.8.2.0.tgz

        [root@master kafkainstall]# cd kafka_2.10-0.8.2.2

    这里我新建了一个kafkainstall文件夹来存放加压后的文件,然后进入解压后的kafka_2.10-0.8.2.2文件夹。

    2.启动zookeeper服务

        由于Kafka的压缩包里已经有了zookeeper,而且提供了启动kafka的脚本(在kafka_2.10-0.8.2.2/bin目录下)和zookeeper的配置文件      (在kafka_2.10-0.8.2.2/config目录下):

        [root@master kafka_2.10-0.8.2.2]# bin/zookeeper-server-start.sh config/zookeeper.properties &

        zookeeper的配置文件zookeeper.properties里面的关键属性:

        # the directory where the snapshot is stored.
          dataDir=/tmp/zookeeper
        # the port at which the clients will connect
          clientPort=2181

         默认情况下,zookeeper的snapshot 文件会存储在/tmp/zookeeper下,zookeeper服务器会监听 2181端口。

    3.启动Kafka broker服务

        由于kafka已经提供了启动kafka的脚本(在kafka_2.10-0.8.2.2/bin目录下),这里直接启动即可:

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server.properties &

        Kafka broker的配置文件的关键属性:

        # The id of the broker. This must be set to a unique integer for each broker.
          broker.id=0

        # The port the socket server listens on
          port=9092

        # A comma seperated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs

        # Zookeeper connection string (see zookeeper docs for details).
        # This is a comma separated host:port pairs, each corresponding to a zk
        # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
        # You can also append an optional chroot string to the urls to specify the
        # root directory for all kafka znodes.
          zookeeper.connect=localhost:2181

    4.创建只有一个Partition的topic

        [root@master kafka_2.10-0.8.2.2]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest-topic

        这里创建了一个mytest-topic的topic。

    5.启动一个生产者进程来发送消息

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest-topic

        其中,(1)参数broker-list定义了生产者要推送消息的broker地址,以<IP地址:端口>形式 ,由上面的broker的配置文件可知                                      为localhost:9092;

                  (2)参数topic指定生产者发送给哪个topic。   

        生产者配置文件关键属性:

         # list of brokers used for bootstrapping knowledge about the rest of the cluster
         # format: host1:port1,host2:port2 ...
    metadata.broker.list=localhost:9092

        # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
    producer.type=sync

         # message encoder
    serializer.class=kafka.serializer.DefaultEncoder

        接着你就可以输入你想要发送给消费者的消息了。(也可以先启动消费者进程,这样生产者发送的消息可以立刻显示)

    6.启动一个消费者进程来消费消息

        需要另外打开一个终端:    

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytest-topic --from-beginning

       其中,(1)参数zookeeper指定了连接zookeeper的地址,以<IP地址:端口>形式;

                 (2)topic参数指定了从哪个topic来pull消息。

         当你执行这个命令之后,你便可以看到控制台上打印出的生产者生产的消息:

         消费者配置文件consumer.properties关键属性:

         # Zookeeper connection string
         # comma separated host:port pairs, each corresponding to a zk
         # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
            zookeeper.connect=localhost:2181
        # timeout in ms for connecting to zookeeper
          zookeeper.connection.timeout.ms=60000
        #consumer group id
          group.id=test-consumer-group

    二、Single node – multiple broker集群(单节点多boker集群配置)

       注:图片来源自网络

    1.启动zookeeper服务

        启动方法跟上面一样

    2.启动Kafka broker服务

        如果需要在单个节点(即一台机子)上面启动多个broker(这里我们启动三个broker),需要准备多个server.properties文件即可,我们需要复制kafka_2.10-0.8.2.2/config/server.properties文件。

        如下:

        [root@master config]# cp server.properties server-1.properties

        [root@master config]# cp server.properties server-2.properties

        然后修改server-1.properties和server-2.properties。

        server-1:

    1. broker.id=1

            2.port=9093

     3.log.dirs=/tmp/kafka-logs-1

        server-2:

    1. broker.id=2

            2.port=9094

     3.log.dirs=/tmp/kafka-logs-2

         然后我们再用这两个配置文件分别启动一个broker:

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server-1.properties &

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server-2.properties &

         然后启动:

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server.properties &

    3.创建只有1个Partition和3个备份的的topic

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    4.启动一个Producer发送消息

         如果用一个Producer发送给多个broker(这里是3个),唯一需要改变的就是在broker-list属性中指定要连接的broker:

        [root@master kafka_2.10-0.8.2.2]#bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,

    localhost:9094 --topic my-replicated-topic

    5.启动一个消费者来消费消息

        [root@master kafka_2.10-0.8.2.2]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicmy-replicated-topic --from-beginning

         如果要让不同的Producer发送给不同的broker,我们也仅仅需要为每个Producer配置响应的broker-list属性即可。

    三、Multiple node – multiple broker集群(多节点多boker集群配置)

     注:图片来源自网络

     注:上图中每个Node里有两个broker,我这里为了简单写,在每个节点里有一个broker(通过上面的单节点多broker的介绍,可以很容易        扩展)

    1.首先需要配置一个zookeeper集群

        上面一和二中提到的都是在192.168.121.34(主机名为master)上进行的,现在要扩展为多节点多broker集群,就要在另外2台机子上也要安装Kafka,方法同一中的步骤1。

    2.zookeeper集群配置

        zookeeper-0(即上面192.168.121.34(主机名为master)中的zookeeper):

    配置修改为:

         # the directory where the snapshot is stored.

    dataDir=/tmp/zookeeper
         # the port at which the clients will connect
    clientPort=2181
        #the blow five lines are added by @author.
    initLimit=5
    syncLimit=2
    server.0=192.168.121.34:2888:3888
    server.1=192.168.121.35:2889:3889
    server.2=192.168.121.36:2890:3890

        然后在dataDir目录/data/zookeeper/下写一个myid文件,命令如下:

    echo0 >myid

    注意:这个id是zookeeper的主机标识,每个主机id不同第二台是1192.168.121.35(主机名为datanode1),第三台是2192.168.121.36(主机名为datanode2)。也就是说3个zookeeper配置文件除了myid不同,其他都一样。

        最后依次启动3台机子上的zookeeper服务。

    3.配置broker 集群

         broker的配置配置文件(server.properties):按照单节点多实例配置方法在一个节点上启动1个实例,不同的地方是zookeeper的连接串      需要把所有节点的zookeeper都连接起来。

         (1)192.168.121.34(主机名为master)中的kafka_2.10-0.8.2.2/bin/目录下的server.properties文件修改:

        # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    host.name=192.168.121.34

        # A comma seperated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs-0

        # Zookeeper connection string (see zookeeper docs for details).
        # This is a comma separated host:port pairs, each corresponding to a zk
        # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
        # You can also append an optional chroot string to the urls to specify the
        # root directory for all kafka znodes.
    zookeeper.connect=192.168.121.34:2181,192.168.121.35:2181,192.168.121.36:2181

        # Timeout in ms for connecting to zookeeper
           zookeeper.connection.timeout.ms=60000

        注意:把host.name的注释去掉,并更改为本机的IP地址。zookeeper.connection.timeout.ms的默认为6000,但是最好改大点,不然容易超时,但也不能太大,太大影响效率。

        (2)192.168.121.35(主机名为datanode1)中的kafka_2.10-0.8.2.2/bin/目录下的server.properties文件修改:  

        # Hostname the broker will bind to. If not set, the server will bind to all interfaces
           host.name=192.168.121.35

       # A comma seperated list of directories under which to store log files
          log.dirs=/tmp/kafka-logs-1

        其它与上面(1)中相同。

        (3)192.168.121.36(主机名为datanode2)中的kafka_2.10-0.8.2.2/bin/目录下的server.properties文件修改:

        # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    host.name=192.168.121.36

        # A comma seperated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs-2

        其它与上面(1)中相同。

    4.生产者配置文件修改

    # list of brokers used for bootstrapping knowledge about the rest of the cluster
    # format: host1:port1,host2:port2 ...
    metadata.broker.list=192.168.121.34:9092,192.168.121.35:9092,192.168.121.36:9092
    # name of the partitioner class for partitioning events; default partition spreads data randomly
    #partitioner.class=
    # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
    producer.type=async

    5.消费者配置文件修改

         # Zookeeper connection string
         # comma separated host:port pairs, each corresponding to a zk
         # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
      zookeeper.connect=191.168.121.34:2181,191.168.121.35:2181,191.168.121.36:2181
         # timeout in ms for connecting to zookeeper
      zookeeper.connection.timeout.ms=60000

    6.生产者发送消息

        (1)首先创建一个test-replicated-topic(在192.168.121.34(主机名为master)中)

         [root@master kafka_2.10-0.8.2.2]#bin/kafka-topics.sh --create --zookeeper192.168.121.34:2181 --replication-factor 3 --partitions 1 --topictest-replicated-topic

        然后查看已有的topic:

              

       可以看到test-replicated-topic已经创建成功,然后我们再看每个broker在做什么:

            

        其中leader是负责对给定的partition执行所有的读和写的节点,此时的leader是0号节点(即0号broker)。更多解释请看官网

        (2)生产者发送消息(192.168.121.34(主机名为master)节点上)

             

    7.消费者消费消息(分别在三台机子上消费上面发送的消息)

       (1)master上:

           

        (2)datanode1上:

             

       (3)datanode2上:

           

        可以看到,三个节点上的消费者都能正常的接收到其中一个节点上发送的消息。这说明kafka集群基本上已经超过部署。

        PS:实际操作过程中3个节点上的zookeeper的监听端口我也没有统一用2181,但是可以用统一的端口,并没有影响。

  • 相关阅读:
    滴滴快车奖励政策,高峰奖励,翻倍奖励,按成交率,指派单数分级(4月19日)
    2016年小升初海淀区全部初中排名分析
    LVM Linear vs Striped Logical Volumes
    Spring Data Redis实现消息队列——发布/订阅模式
    Redis Pubsub命令用法
    mysql 截取身份证出生日期
    MA均线组合
    Eclipse代码格式化规范
    JSON Web Token实际应用
    JSON Web Token单点登录设计
  • 原文地址:https://www.cnblogs.com/zikai/p/9627544.html
Copyright © 2011-2022 走看看