zoukankan      html  css  js  c++  java
  • kafka服务器搭建与测试

    kafka服务器搭建

      kafka自身集群信息的管理以及对producer和consumer的信息管理是通过zookeeper实现的,zookeepr是kafka不可分割的一部分,所以搭建zookeeper服务器是搭建kafka集群环境不可或缺的一部分。zookeeper集群的搭建在网上有很多相关资料,本文不在详细描述,也可参照我自己编写的zookeeper 安装的三种模式搭建zookeeper集群。本文所写的kafka服务搭建方法适用于Ubuntu系统和MacOS系统(亲测)。

      集群搭建软件环境:

    • JDK: java version 1.8.0_121
    • zookeeper:zookeeper-3.4.9
    • kafka: kafka-2.10-0.10.2.0

      首先,登录官方网站下载搭建环境所需的压缩包kafka-2.10-0.10.2.0.tgz:

    http://kafka.apache.org/downloads

      然后将该包解压到你需要安装的目录下:

    tar -zxvf kafka-2.10-0.10.2.0.tgz
    • 单机模型

      kafka单机模型指的是只在一台服务器上部署一个kafka实例(broker)。首先进入配置文件目录下编写broker配置文件。kafka解压包中的config目录下已经有了一份现成的broker配置文件server.properties,我们将其中的部分内容进行修改已达到自己想要的状态。首先进入config目录下,然后使用vim命令打开server.properties文件:

    vim config/server.properties

      本文主要修改以下参数:

    • broker.id  该参数是kafka集群中每一个broker的唯一标识,用int型表示;
    • delete.topic.enable: 该选项用来表示对topic进行删除操作时是否立刻生效,如果设置为true则立刻生效。默认该选项被注销;
    • host.name: 配置broker的IP地址,即该台服务器的IP地址;
    • port:配置该broker与客户端、与其他broker、与zookeeper通信的端口地址;
    • log.dirs: 配置broker日志存储地址;
    • zookeeper.connect: broker需要向zookeeper注册自己的信息,以便整个集群知道自己的存在;
    • zookeeper.connection.timeout.ms:与zookeeper连接时间,如果在该时间内没有与zookeeper连接成功,则该broker就不会被添加到kafka集群中。

      其他配置参考暂时按照默认数据配置,不做任何更改,修改完成后的配置文件如下(去注释版):

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    # Switch to enable topic deletion or not, default value is false
    #delete.topic.enable=true
    
    # Hostname and port the broker will advertise to producers and consumers. If not set
    
    host.name=127.0.0.1
    port=9092
    
    # The number of threads handling network requests
    num.network.threads=3
    
    # The number of threads doing disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma seperated list of directories under which to store log files
    log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    
    
    ############################# Log Retention Policy #############################
    
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

      至此,kafka单机版环境搭建完成。下面赶快运行尝鲜吧。

      首先启动zookeeper集群,从配置文件中可以看出,本文只配置了一台zookeeper服务器。首先进入zookeeper的bin目录下,然后执行启动命令:

      然后启动kafka服务器,kafka服务器启动命令需要添加broker配置文件作为参数:

    cd ~/DevelopEnvironment/kafka_2.10-0.10.2.0/bin
    ./kafka-server-start.sh ../config/server.properties 

      当看到终端输出“[Kafka Server 0], started (kafka.server.KafkaServer)”等相关信息时,说明kafka集群启动成功。

      下面我们创建一个topic,该topic名称为“my-first-topic”,对应的partitions和replication都为1,这两个参数的含义请参见kafka基础概念,这里不再详细介绍。在bin目录下执行如下命令:

    ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic my-first-topic --partitions 1 --replication-factor 1

      此时我们看见终端打印出如下信息:Created topic "my-first-topic". 说明我们的topic创建成功。现在可以通过describe参数查看我们创建的topic:

      下面对打印出来的内容做个粗略解释:

    • Topic:topic 名称;
    • PartitionCount:该topic的partition个数;
    • ReplicationFactor:每个partition的冗余个数(包括自身).

      接下来的五个参数一起解释:my-first-topic(Topic)只有一个Partition为Partiton 0,所以他的所有备份也就只有一个在broker 0上(Replicas:0),所以partition 0对应的loader也就在broker 0上(Leader:0),当前存储partition 0 有效的broker 只有 broker 0。 

      下面我们通过kafka自带producer客户端脚本文件向刚创建的topic发送信息:

     ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-first-topic
    This is test message : 1
    This is test message : 2
    This is test message : 3

      然后我们运用kafka自带的consumer客户端脚本文件从broker 0 的 my-first-topic 中获取刚才发送的三条消息,命令如下:

    ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-first-topic --from-beginning

      当执行上面命令后终端打印出如下信息:

    This is test message : 1
    This is test message : 2
    This is test message : 3

      恰好是我们刚刚发送的三条信息,说明以上操作完全成功^_^

      停止集群命令:

    ./kafka-server-stop.sh 
    • kafka伪集群模型

      在实际的生产过程中,对kafka的应用一般不会只使用一台服务器,都是通过多台服务器搭建kafka集群环境,这样才能体现出kafka的易扩展性、高吞吐量和安全性。由于本文章只是为了学习使用,而且我也只有一个PC机,没有集群环境,所以我么可以搭建一个kafka伪集群模型来模拟kafka集群模型。

      前面我们说过,kafka启动命令中需要携带broker配置文件作为参数,而broker也就代表着kafka集群中的每台服务器,所以我们要模拟一个有N台服务器的kafka集群,就需要编写N个broker配置文件。由于是在一个机器上模拟多台broker,所以每个broker的host.name 均相同,但port一定不能一样,否则就会后面启动的broker就会因为端口占用而不会正确启动。

      下面分别为另外两个broker的配置文件server-1.properties和server-2.properties

      server-1.properties:

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=1
    
    # Switch to enable topic deletion or not, default value is false
    #delete.topic.enable=true
    
    
    
    # Hostname and port the broker will advertise to producers and consumers. If not set
    
    host.name=127.0.0.1
    port=9093
    
    # The number of threads handling network requests
    num.network.threads=3
    
    # The number of threads doing disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma seperated list of directories under which to store log files
    log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log-1
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    
    
    ############################# Log Retention Policy #############################
    
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

      server-2.properties:

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=2
    
    # Switch to enable topic deletion or not, default value is false
    #delete.topic.enable=true
    
    
    
    # Hostname and port the broker will advertise to producers and consumers. If not set
    
    host.name=127.0.0.1
    port=9094
    
    # The number of threads handling network requests
    num.network.threads=3
    
    # The number of threads doing disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma seperated list of directories under which to store log files
    log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log-2
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    
    
    ############################# Log Retention Policy #############################
    
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

      需要注意的是三个配置文件不同的地方:

    1.  broker.id
    2.  port
    3.  log.dirs

      下面分别启动启动三个broker,请确保在启动broker之前zookeeper集群一定是成功运行的状态。启动命令如下:

    ./kafka-server-start.sh ../config/server.properties
    ./kafka-server-start.sh ../config/server-1.properties
    ./kafka-server-start.sh ../config/server-2.properties

      从三个broker的启动信息当中我们可以看到如下信息:

      其中的127.0.0.1,9092、127.0.0.1,9093、127.0.0.1,9094说明我们的配置文件起到了作用^_^,并且三个broker都启动成功。

      下面我们在伪集群中创建新的topic:my-second-topic,并设置partitions为3,replication-factor为2,命令如下:

    ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic my-second-topic --partitions 3 --replication-factor 2

      执行完命令后,终端显示“Created topic "my-second-topic".” 说明topic创建成功啦。

      同样,我们使用describe命令查看我们创建的topic:

    ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-second-topic

      终端显示如下:

      各个参数的含义上面已经解释过啦,而且也十分容易看懂,这里就不在重复描述啦。

      下面我们向my-second-topic 发送新的message:

    ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic
    my new test message 1
    my new test message 2
    my new test message 3

      然后,我们从consumer端获取刚才发送的三条新消息:

    ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning

      执行完命令后,我们可以看到终端打印出来了刚才发送的三条消息:

    my new test message 1
    my new test message 2
    my new test message 3

      此时说明我们搭建的集群已经可以发送和接受消息啦。

      kafka集群通过replicate方式来保证集群的安全性,哪怕只有一个broker存活,整个kafka集群系统也能正常运行(前提是你需要的topic在存活的那个broker上),那么下面我们就测试一下 集群的抗击打能力。

      上面我们已经通过describe命令查看了my-second-topic状态,这里我们在重复执行一次,以便与下面做对比,命令不在重复,执行后的结果如下:

      由于我们刚才向broker 0上发送了消息,所以刚才接受消息的一定是 partition 2。下面我们kill掉broker 0 服务器对应的进程

    ps aux | grep server.properties

    wangchaohui       3310   1.1  6.9  6279396 1161100 s000  S+    4:24下午   1:03.88 /usr/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC

      然后根据进程号杀死该进程:

    kill -9 3310

      执行完成后可以看见broker 1,broker 2 对应的终端打印出如下信息:

    java.io.IOException: Connection to 0 was disconnected before the response was read
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:114)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
        at scala.Option.foreach(Option.scala:236)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
        at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:142)
        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
        at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
    [2017-03-20 17:09:40,278] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@3e2b9697 (kafka.server.ReplicaFetcherThread)
    java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed
        at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
        at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
    [2017-03-20 17:09:42,288] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@7ee0ff45 (kafka.server.ReplicaFetcherThread)
    java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed
        at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
        at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
    [2017-03-20 17:09:44,011] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    [2017-03-20 17:09:44,012] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    [2017-03-20 17:09:44,012] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    [2017-03-20 17:09:44,292] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@3f48dc67 (kafka.server.ReplicaFetcherThread)
    java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed
        at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
        at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
        at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

      从waring信息可以看到连接broker 0 失败。下面重新执行 topic 命令 describe,查看my-second-topic状态如下:

      与刚才的topic状态进行对比,发现partition1和partition2都受到了影响,对于partition1 和partition2 ,当前有效的broker只有 2 和 1 了,并且,partition2的leader由原来的0变成了1。

      那么,我们现在同样向broker0发送message,看看会不会影响客户端发送信息,在刚才的基础上,我们在发送4,5,6三条消息:

    ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic
    my new test message 1
    my new test message 2
    my new test message 3
    my new test message 4
    my new test message 5
    my new test message 6
    

      此时客户端仍然可以成功接受到信息:

    wangchahuideMBP:bin wangchaohui$ ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning
    my new test message 1
    my new test message 2
    my new test message 3
    [2017-03-20 17:09:46,804] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=1, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    my new test message 4
    my new test message 5
    my new test message 6

       那么我们接着kill掉broker 1,看看还能不能发送信息:

    ps aux | grep server-1.properties
    ......
    kill -9 3539

      再次查看my-second-topic状态如下:

      此时我们发现partition 0的leader由原来的1变成了2,可是我不明白的是为什么partition2的leader还是broker 1 ,而且有效broker 竟然是1......

      下面继续发送消息7,8,9:

    ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic
    my new test message 1
    my new test message 2
    my new test message 3
    my new test message 4
    my new test message 5
    my new test message 6
    my new test message 7
    my new test message 8
    my new test message 9

     此时consumer依然接受到消息:

    /kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning
    my new test message 1
    my new test message 2
    my new test message 3
    [2017-03-20 17:09:46,804] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=1, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    my new test message 4
    my new test message 5
    my new test message 6
    [2017-03-20 17:26:48,011] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=2, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    [2017-03-20 17:26:53,016] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=2, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    my new test message 7
    my new test message 8
    my new test message 9

      这个实验说明了kafka集群的可靠性。果然只有一个broker存活,客户端不需要修改任何参数依然可以正常发送和接受消息。

    参考文献

      

      

  • 相关阅读:
    Jquery选择器 选择一个不存在的元素 为什么不会返回 false
    flask接收前台的form数据
    virtualenv
    easy_install与pip 区别
    Linux安装Python2.7
    能者多劳
    西游记的管理智慧:选择团队领导人的核心奥秘
    最高管理智慧:留个缺口给别人
    团队管理的1 4 7法则
    Spring的AOP面向切面编程
  • 原文地址:https://www.cnblogs.com/jxwch/p/6586500.html
Copyright © 2011-2022 走看看