zoukankan      html  css  js  c++  java
  • Kafka及集群部署

    Kafka介绍

        官网:http://kafka.apache.org

        Kafka是一款性能非常好的并且支持分布式的消息队列中间件。由于它的高吞吐特性,Kafka通常使用在大数据领域,如日志收集平台。其实Kafka是一个流处理平台,这个概念不太好理解,之所以叫做流,是因为它在工作中就像是一个可以支撑高吞吐量的管道,数据像水一样流进去,然后另外一端再去读取这些数据。我们就可以把Kafka看作是一种特殊的消息队列中间件。

        kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。 kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响。

        Kafka与传统消息系统相比,有以下不同:

    1)它被设计为一个分布式系统,易于向外扩展;
    2)它同时为发布和订阅提供高吞吐量;
    3)它支持多订阅者,当失败时能自动平衡消费者;
    4)它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

    1、Kafka中有几个关键角色和概念

    1)Producer

        消息生产者,是消息的产生源头,负责生成消息并发送给Kafka。

        生产者创建消息。在其他发布与订阅系统中,生产者可能被称为发布者或写入者。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

    2)Consumer

        消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

        消费者读取消息。在其他发布与订阅系统中,消费者可能被称为订阅者或读者。消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。

        消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用。

    3)Topic

        主题,由用户自定义,并配置在Kafka服务器,用于建立生产者和消费者之间的订阅关系,生产者将消息发送到指定的Topic,然后消费者再从该Topic下去取消息。

         一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。

    4)Partition

    消息分区,一个Topic下面会有多个Partition,每个Partition都是一个有序队列,Partition中的每条消息都会被分配一个有序的id。

         物理上的分区,topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

        Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上

    5)Broker

        这个其实就是Kafka服务器了,无论是单台Kafka还是集群,被统一叫做Broker。

        一个独立的 Kafka 服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。

    • Kafka 集群包含一个或多个服务器,服务器节点称为broker。
    • broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
    • 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
    • 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

        broker 是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker 和监控broker。在集群中,一个分区从属于一个 broker,该 broker 被称为分区的首领。一个分区可以分配给多个 broker,这个时候会发生分区复制(见图 )。这种复制机制为分区提供了消息冗余,如果有一个 broker 失效,其他 broker 可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。

    6)Group

         消费者分组,将同一类的消费者归类到一个组里。在Kafka中,多个消费者共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个组名。

    kafka通过zookeeper管理集群配置,选举leader

    2、kafka特点

    1)Kafka:内存、磁盘、数据库、支持大量堆积

        kafka的最小存储单元是分区,一个topic包含多个分区,kafka创建主题时,这些分区会被分配在多个服务器上,通常一个broker一台服务器。

        分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的broker加入集群的时候,部分副本会被移动到新的broker上。

        根据配置文件中的目录清单,kafka会把新的分区分配给目录清单里分区数最少的目录。

        默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了key的情况,会根据key的hashcode取模后的值存到对应的分区中。

    2)Kafka:支持负载均衡
    a)一个broker通常就是一台服务器节点。对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。
        每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。
    b)kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。
    c)当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。
       kafka的负载均衡大部分是自动完成的,分区的创建也是kafka完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。
    d)发送端由topic和key来决定消息发往哪个分区,如果key为null,那么会使用轮询算法将消息均衡地发送到同一个topic的不同分区中。如果key不为null,那么会根据key的hashcode取模计算出要发往的分区。
    3)集群方式,天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave
        分区首领均匀地分布在不同的kafka服务器上,分区副本也均匀地分布在不同的kafka服务器上,所以每一台kafka服务器既含有分区首领,同时又含有分区副本,每一台kafka服务器是某一台kafka服务器的Slave,同时也是某一台kafka服务器的leader。
        kafka的集群依赖于zookeeper,zookeeper支持热扩展,所有的broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠zookeeper集群的mq相比,这是最大的优势。

    Kafka工作流程

    1)生产者定期向主题发送消息。

    2)Kafka broker将所有消息存储在为该特定主题配置的分区中。它确保消息在分区之间平等共享。如果生产者发送两个消息,并且有两个分区,则Kafka将在第一个分区中存储一个消息,在第二个分区中存储第二个消息。

    3)消费者订阅一个特定的主题。

    4)一旦消费者订阅了一个主题,Kafka将向消费者提供该主题的当前偏移量,并将偏移量保存在ZooKeeper中。

    5)消费者将定期请求Kafka新消息。

    6)一旦Kafka收到来自生产者的消息,它会将这些消息转发给消费者。

    7)消费者将收到消息并处理它。

    8)一旦消息被处理,消费者将向Kafka broker发送确认。

    9)一旦Kafka收到确认,它会将偏移量更改为新值,并在ZooKeeper中进行更新。由于ZooKeeper中保留了偏移量,因此即使在服务器出现故障时,消费者也可以正确读取下一条消息。

    kafka集群部署:

    (1)Kafka架构是由producer(消息生产者)、consumer(消息消费者)、borker(kafka集群的server,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker)、topic(消息队列/分类相当于队列,里面有生产者和消费者模型)、zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成。

    (2)kafka里面的消息是有topic来组织的,简单的我们可以想象为一个队列,一个队列就是一个topic,然后它把每个topic又分为很多个partition,这个是为了做并行的,在每个partition内部消息强有序,相当于有序的队列,其中每个消息都有个序号offset,比如0到12,从前面读往后面写。一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition。这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,消息不经过内存缓冲,直接写入文件,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念。

    (3)producer自己决定往哪个partition里面去写,这里有一些的策略,譬如如果hash,不用多个partition之间去join数据了。consumer自己维护消费到哪个offset,每个consumer都有对应的group,group内是queue消费模型(各个consumer消费不同的partition,因此一个消息在group内只消费一次),group间是publish-subscribe消费模型,各个group各自独立消费,互不影响,因此一个消息在被每个group消费一次。

    kafka的集群安装配置:

       1)kafka集群的安装配置依赖zookeeper,搭建kafka集群之前,需先部署好一个可用的zookeeper集群

       2)需安装openjdk运行环境

       3)同步kafka拷贝到所有集群主机

       4)修改配置文件

       5)每台服务器的broker.id都不能相同

       6)zookeeper.connect集群地址,不用都列出,写一部分即可

     zookeeper集群也可通过kafka中自带的zookeeper来部署,修改自带zookeeper配置文件可与zookeeper的配置文件一致

    启动:

    [root@ecs-1d01 kafka_2.10-0.10.2.1]#nohup  /data/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh  /data/kafka_2.10-0.10.2.1/config/zookeeper.properties > /dev/null 2>&1  &

    部署(以下Centos6及7都适用)

    部署环境:
    操作系统                    IP                 kafka版本
    rhel6.5              192.168.1.234             2.1.1
    rhel6.5              192.168.1.206             2.1.1
    rhel6.5              192.168.1.45              2.1.1

    1、创建用户,全部下载kafka

    [root@kafka-0001]$useradd  wushaoyu
    
    [root@kafka-0001]$su -  wushaoyu
    
    [wushaoyu@kafka-0001]$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz

    2、创建消息目录,修改配置文件

    [wushaoyu@kafka-0001 ~]$ cd kafka_2.11-2.1.1
    [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ mkdir logs
    [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ cd config/
    [wushaoyu@kafka-0001 config]$ cat server.properties |egrep -v "^$|^#"
    broker.id=1                                         #broker的全局唯一编号不能重复,建议与zookeeper的myid对应
    listeners=PLAINTEXT://192.168.1.234:9092 #broker监听ip和端口
    num.network.threads
    =3 #borker进行网络处理的线程数 num.io.threads=8 #borker进行I/O处理的线程数 socket.send.buffer.bytes=102400 #发送缓冲区大小,即发送消息先发送到缓冲区,当缓冲区满了在一起发出去 socket.receive.buffer.bytes=102400 #接收缓冲区大小,接收消息先放到接收缓冲区,当达到这个数量时同步到磁盘 socket.request.max.bytes=104857600 #向kafka套接字请求的最大字节数量,防止服务器outofmemory,大小最好不要超过java的堆栈大小 log.dirs=/home/wushaoyu/kafka_2.11-2.1.1/logs #消息存放目录,不是日志目录 num.partitions=1 #每个topic的默认分区数 num.recovery.threads.per.data.dir=1 #处理消息目录的线程数,若设置了3个消息路径,改参数为2,那么一共需要6个线程 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 #消息过期时间,默认为1周 log.segment.bytes=1073741824 #日志文件中每个segment的大小,默认为1G,topic的分区是以一堆segment文件存储的,超过此限制会建立一个新的日志文件。此参数若在创建topic时的指定,那么参数覆盖,以指定的为准 log.retention.check.interval.ms=300000 #如上设置了每个segment文件大小为1G,那么此时间间隔就是检查他的大小有没有达到1G,检查的时间间隔 zookeeper.connect=192.168.1.234:2181,192.168.1.206:2181,192.168.1.45:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0

    配置文件:

    log.dirs:Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 log.dirs 指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。

    num.recovery.threads.per.data.dir:

      对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:
      • 服务器正常启动,用于打开每个分区的日志片段;
      • 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
      • 服务器正常关闭,用于关闭日志片段

        默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的

    服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

    3、启动kafka

    [wushaoyu@kafka-0002 kafka_2.11-2.1.1]$ ./bin/kafka-server-start.sh ./config/server.properties 
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
    #
    # There is insufficient memory for the Java Runtime Environment to continue.      
    # Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
    # An error report file with more information is saved as:
    # /home/wushaoyu/kafka_2.11-2.1.1/hs_err_pid23272.log
    此处出现报错:无法分配足够的内存,因为部署环境为云主机,只有1G内存,所以可添加交换分区解决 查看内存大小 [wushaoyu@kafka
    -0002 kafka_2.11-2.1.1]$ free -m total used free shared buffers cached Mem: 995 920 75 0 80 641 -/+ buffers/cache: 198 797 Swap: 0 0 0 创建交换分区 [root@kafka-0001 ~]# dd if=/dev/zero of=/tmp/swap bs=1M count=8192 #创建文件,大小为8G 8192+0 records in 8192+0 records out 8589934592 bytes (8.6 GB) copied, 52.9978 s, 162 MB/s [root@kafka-0001 ~]# mkswap /tmp/swap #创建交换分区 mkswap: /tmp/swap: warning: don't erase bootbits sectors on whole disk. Use -f to force. Setting up swapspace version 1, size = 8388604 KiB no label, UUID=84ea82c7-35a3-46be-926a-73dfc7e18548 [root@kafka-0001 ~]# swapon /tmp/swap #启用交换分区 [root@kafka-0001 ~]# free -m total used free shared buffers cached Mem: 995 928 67 0 22 719 -/+ buffers/cache: 186 809 Swap: 8191 0 8191 再次启动 [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ nohup ./bin/kafka-server-start.sh ./config/server.properties >kafka.log 2>&1 & (使用nohup启动,指定日志存放目录) 或[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-server-start.sh -daemon ./config/server.properties #此命令默认会在当前目录下创建logs目录用以存放日志

     Zookeeper+Kafka集群验证及消息发布

    创建主题

    # 创建一个topic,3个分区,3个副本
    [root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-topics.sh --create --zookeeper 192.168.1.160:2181,192.168.1.200:2181,192.168.1.235:2181 -replication-factor 3 --partitions 3 --topic test1
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    Created topic "test1"
    
    # 查看topic,确认topic创建成功
    [root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-topics.sh --list --zookeeper 192.168.1.160:2181
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    test1
    
    #查看topic,详细信息
    [root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.1.160:2181 --topic test1
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    可以描述为:Topic分区数/副本数/副本Leader/副本ISR等信息: 
    Topic:test1 PartitionCount:
    3 ReplicationFactor:3 Configs: Topic: test1 Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,3,2 Topic: test1 Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3 Topic: test1 Partition: 2 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0

    “leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
    “replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
    “isr”:同步备份”的节点列表,也就是活着的节点并且正在同步leader
    其中Replicas和Isr中的1,2,0就对应着3个broker他们的broker.id属性!

    发布信息

    模拟生产者,发布消息
    [root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-console-producer.sh --broker-list 192.168.1.160:9092 --topic test1 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N hello,world i love you 模拟消费者,接收消息 [root@ecs-a0d8-0002 kafka_2.10-0.10.2.1]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic test1 --from-beginning OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N k;lkl; love you hello,world hkhj i love you

      #在 producer 里输入消息,consumer 中就会显示出同样的内容,表示消费成功   

      # --from-beginning表示从开始接收,否则只从当前offset接收新产生的消息      

    至此,消息生产和消费没有问题,Kafka集群部署完成。


    Kafka常用命令

    1)查看topic

    [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-topics.sh --list --zookeeper 192.168.1.234:2181
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    __consumer_offsets
    mymsg

    2)查看topic msmsg详情

    [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.1.234:2181 --topic mymsg
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    Topic:mymsg    PartitionCount:1    ReplicationFactor:2    Configs:
        Topic: mymsg    Partition: 0    Leader: 1    Replicas: 1,3    Isr: 1,3

    3)删除topic

    [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-topics.sh --delete --zookeeper 192.168.1.234:2181 --topic mymsg

    4)生产者参数查看

    [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-console-producer.sh

    5)生成者参数查看

    [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-console-consumer.sh

    kafka集群管理工具kafka-manager

        为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。kafka-manager 项目地址:https://github.com/yahoo/kafka-manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。这个管理工具也是一个非常好的可以快速浏览这个集群的工具,kafka-manager有如下功能:

    - 管理多个kafka集群
    - 便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况)
    - 选择你要运行的副本
    - 基于当前分区状况进行
    - 可以选择topic配置并创建topic(0.8.1.1和0.8.2的配置不同)
    - 删除topic(只支持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true)
    - Topic list会指明哪些topic被删除(在0.8.2以上版本适用)
    - 为已存在的topic增加分区
    - 为已存在的topic更新配置
    - 在多个topic上批量重分区
    - 在多个topic上批量重分区(可选partition broker位置)

    Githua下载地址:https://github.com/yahoo/kafka-manager/releases

    1、部署kafka-manager

    [root@ecs-1d01 data]# cd kafka-manager-1.3.3.23/
    
    [root@ecs-1d01 kafka-manager-1.3.3.23]# cd conf/
    
    [root@ecs-1d01 conf]# vim application.conf
    
    #kafka-manager.zkhosts="localhost:2181"
    
    kafka-manager.zkhosts="192.168.1.160:2181,192.168.1.235:200:2181,192.168.1.235:2181"   #把zookeeper都写进去

    2、启动kafka-manager

    [root@ecs-1d01 kafka-manager-1.3.3.23]# nohup ./bin/kafka-manager  -Dhttp.port=9107   > /dev/null 2>&1 &  //默认端口为9000,可自定义指定
    
    [root@ecs-1d01 kafka-manager-1.3.3.23]# ss -tnulp|grep 9107
    tcp LISTEN 0 50 :::9107 :::* users:(("java",pid=28816,fd=116))

    3、访问验证

    4、kafka-mamager测试

         如果没有在 Kafka 中配置过 JMX_PORT,不要选择第一个复选框。Enable JMX Polling如果选择了该复选框,Kafka-manager 可能会无法启动

      可根据实际情况,添加集群中的zookeeper真实数量

       其他broker的配置可以根据自己需要进行配置,默认情况下,点击【保存】时,会提示几个默认值为1的配置错误,需要配置为>=2的值。

       保存,创建完成

    5、查看TOPIC 信息

    6、查看broker信息

    7、管理kafka-manager

    1)新建主题
    点击【Topic】>【Create】可以方便的创建并配置主题。如下显示。

       由于集群中只有1个节点,所以副本数最多设置为1

    2)查看主题

    针对Topic->Create新建主题的配置,根据一张图事宜:

        在上图一个Kafka集群中,有两个服务器,每个服务器上都有2个分区。P0,P3可能属于同一个主题,也可能是两个不同的主题。
        如果设置的Partitons和Replication Factor都是2,这种情况下该主题的分步就和上图中Kafka集群显示的相同,此时P0,P3是同一个主题的两个分区。P1,P2也是同一个主题的两个分区,Server1和Server2其中一个会作为Leader进行读写操作,另一个通过复制进行同步。
        如果设置的Partitons和Replication Factor都是1,这时只会根据算法在某个Server上创建一个分区,可以是P0~4中的某一个(分区都是新建的,不是先存在4个然后从中取1个)。

  • 相关阅读:
    绘制饼状图
    柱状图使用实例
    柱状图颜色区分
    bar函数与barh函数
    绘制柱状图
    绘制不同样式不同颜色的线
    Windows10没有修改hosts文件权限
    Lucene入门学习
    Kafka学习笔记
    Elasticsearch学习笔记(强推)
  • 原文地址:https://www.cnblogs.com/wushaoyu/p/11216812.html
Copyright © 2011-2022 走看看