zoukankan      html  css  js  c++  java
  • Kafka版本升级 ( 0.10.0 -> 0.10.2 )

    升级Kafka集群的版本其实很简单,核心步骤只需要4步,但是我们需要在升级的过程中确保每一步操作都不会“打扰”到producer和consumer的正常运转。为此,笔者在本机搭了一个测试环境进行实际的版本升级实验。在开始之前,简要介绍一下测试环境的部署情况及目标:Kafka 0.10.0.0 双broker测试环境,而目标是把该集群升级到0.10.2版本

    两个broker启动时分别读取server.properties和server2.properties。

    一、启动测试环境
    打开两个终端,分别执行startBroker1.sh和startBroker2.sh。startBroker*.sh内容很简单就是:

    CURRENT_PATH=<your_path>/kafka_2.11-0.10.0.0
    cd $CURRENT_PATH
    JMX_PORT=9997 bin/kafka-server-start.sh ../configs/server.properties

    二、创建测试topic
    创建一个双分区,replication-factor=2的topic:test,然后使用kafka-topics工具describe一下:

    okay,目前一切正常。

    三、启动producer

    很简单的producer程序,每1秒发送一条消息,然后打印成功提交的消息数和提交失败的消息数。特别注意提交失败的消息数,后续我们依赖此值来确保升级流程不会影响到producer。 主要程序代码如下:

     1 Properties props = new Properties();
     2         props.put("bootstrap.servers", "localhost:9092,localhost:9093");
     3         props.put("acks", "all");
     4         props.put("retries", Integer.MAX_VALUE);
     5         props.put("batch.size", 16384);
     6         props.put("linger.ms", 1);
     7         props.put("buffer.memory", 33554432);
     8         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     9         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    10 
    11         Producer<String, String> producer = new KafkaProducer<>(props);
    12         final AtomicInteger success = new AtomicInteger(0);
    13         final AtomicInteger failed = new AtomicInteger(0);
    14         try {
    15             while (true) {
    16                 producer.send(new ProducerRecord<String, String>("test", "a message"), new Callback() {
    17                     @Override
    18                     public void onCompletion(RecordMetadata metadata, Exception exception) {
    19                         if (exception != null) {
    20                             System.out.println("Current failed count: " + failed.incrementAndGet());
    21                         } else
    22                             System.out.println("Current success count: " + success.incrementAndGet()
    23                                     + ", failed: " + failed.get());
    24                     }
    25                 });
    26                 Thread.sleep(2000);
    27 
    28             }
    29         } finally {
    30             producer.close();
    31

    四、启动consumer

    为简单起见,我使用了console-consumer,如下所示。另外, 是Kafka 0.10.0.0版本,所以一定要加上`--new-consumer`才能使用新版本consumer!
    bin/kafka-console-consumer.sh --topic test --from-beginning --new-consumer --bootstrap-server localhost:9092,localhost:9093

    此时,你应该可以看到producer和consumer都可以正常地工作。

    ----------------------------- 升级的流程正式开始 -----------------------------

    切记: 每做完一步都要观察producer和consumer是否出现严重错误!

    五、 更新broker间通讯版本号和消息格式版本
    向所有broker的server.properties文件中增加下面两行:

    inter.broker.protocol.version=0.10.0
    log.message.format.version=0.10.0

    六、依次更新代码,重启所有broker
    注意一定要依次重启,即先重启broker1,然后再重启broker2


    七、再次更新broker间通讯版本和消息格式版本

    inter.broker.protocol.version=0.10.2
    log.message.format.version=0.10.2

    注意,这次要更新成你要升级到的目标版本。比如我们要升级到0.10.2,那么就更新为0.10.2

    八、再次依次重启broker
    依然要依次重启

    好了,当前集群版本已经升级完毕了。值得一提的是,在整个升级过程中producer应该是可以正常工作的,但consumer可能会出现位移提交失败的警告,因此有可能会造成重复消费,而broker端可能会出现“org.apache.kafka.common.errors.NotLeaderForPartitionException”异常,因为__consumers_offsets各分区的leader有可能会发生瞬时的变化,因此通常也是不必在意的。

  • 相关阅读:
    windows中dos命令指南
    HDU 2084 数塔 (dp)
    HDU 1176 免费馅饼 (dp)
    HDU 1004 Let the Balloon Rise (map)
    变态杀人狂 (数学)
    HDU 2717 Catch That Cow (深搜)
    HDU 1234 开门人和关门人 (模拟)
    HDU 1070 Milk (模拟)
    HDU 1175 连连看 (深搜+剪枝)
    HDU 1159 Common Subsequence (dp)
  • 原文地址:https://www.cnblogs.com/huxi2b/p/6525465.html
Copyright © 2011-2022 走看看