zoukankan      html  css  js  c++  java
  • Kafka版本升级 ( 0.10.0 -> 0.10.2 )

    升级Kafka集群的版本其实很简单,核心步骤只需要4步,但是我们需要在升级的过程中确保每一步操作都不会“打扰”到producer和consumer的正常运转。为此,笔者在本机搭了一个测试环境进行实际的版本升级实验。在开始之前,简要介绍一下测试环境的部署情况及目标:Kafka 0.10.0.0 双broker测试环境,而目标是把该集群升级到0.10.2版本

    两个broker启动时分别读取server.properties和server2.properties。

    一、启动测试环境
    打开两个终端,分别执行startBroker1.sh和startBroker2.sh。startBroker*.sh内容很简单就是:

    CURRENT_PATH=<your_path>/kafka_2.11-0.10.0.0
    cd $CURRENT_PATH
    JMX_PORT=9997 bin/kafka-server-start.sh ../configs/server.properties

    二、创建测试topic
    创建一个双分区,replication-factor=2的topic:test,然后使用kafka-topics工具describe一下:

    okay,目前一切正常。

    三、启动producer

    很简单的producer程序,每1秒发送一条消息,然后打印成功提交的消息数和提交失败的消息数。特别注意提交失败的消息数,后续我们依赖此值来确保升级流程不会影响到producer。 主要程序代码如下:

     1 Properties props = new Properties();
     2         props.put("bootstrap.servers", "localhost:9092,localhost:9093");
     3         props.put("acks", "all");
     4         props.put("retries", Integer.MAX_VALUE);
     5         props.put("batch.size", 16384);
     6         props.put("linger.ms", 1);
     7         props.put("buffer.memory", 33554432);
     8         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     9         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    10 
    11         Producer<String, String> producer = new KafkaProducer<>(props);
    12         final AtomicInteger success = new AtomicInteger(0);
    13         final AtomicInteger failed = new AtomicInteger(0);
    14         try {
    15             while (true) {
    16                 producer.send(new ProducerRecord<String, String>("test", "a message"), new Callback() {
    17                     @Override
    18                     public void onCompletion(RecordMetadata metadata, Exception exception) {
    19                         if (exception != null) {
    20                             System.out.println("Current failed count: " + failed.incrementAndGet());
    21                         } else
    22                             System.out.println("Current success count: " + success.incrementAndGet()
    23                                     + ", failed: " + failed.get());
    24                     }
    25                 });
    26                 Thread.sleep(2000);
    27 
    28             }
    29         } finally {
    30             producer.close();
    31

    四、启动consumer

    为简单起见,我使用了console-consumer,如下所示。另外, 是Kafka 0.10.0.0版本,所以一定要加上`--new-consumer`才能使用新版本consumer!
    bin/kafka-console-consumer.sh --topic test --from-beginning --new-consumer --bootstrap-server localhost:9092,localhost:9093

    此时,你应该可以看到producer和consumer都可以正常地工作。

    ----------------------------- 升级的流程正式开始 -----------------------------

    切记: 每做完一步都要观察producer和consumer是否出现严重错误!

    五、 更新broker间通讯版本号和消息格式版本
    向所有broker的server.properties文件中增加下面两行:

    inter.broker.protocol.version=0.10.0
    log.message.format.version=0.10.0

    六、依次更新代码,重启所有broker
    注意一定要依次重启,即先重启broker1,然后再重启broker2


    七、再次更新broker间通讯版本和消息格式版本

    inter.broker.protocol.version=0.10.2
    log.message.format.version=0.10.2

    注意,这次要更新成你要升级到的目标版本。比如我们要升级到0.10.2,那么就更新为0.10.2

    八、再次依次重启broker
    依然要依次重启

    好了,当前集群版本已经升级完毕了。值得一提的是,在整个升级过程中producer应该是可以正常工作的,但consumer可能会出现位移提交失败的警告,因此有可能会造成重复消费,而broker端可能会出现“org.apache.kafka.common.errors.NotLeaderForPartitionException”异常,因为__consumers_offsets各分区的leader有可能会发生瞬时的变化,因此通常也是不必在意的。

  • 相关阅读:
    在App_Data中创建数据库获取连接串简便方法!
    ObjectDataSource配合存储过程(采用数据集)的使用(删除可以解决,但是编辑出错好像它的方法也无法解决
    金鹰dreamweaver视频教程下载地址
    ASP.NET里创建Microsoft Word文档
    net3:Calendar控件的使用
    vs2005做的留言本——天轰川下载
    Wiley出版 SQL Server 2005宝典
    ADO:防止更新的数据含有单引号而出错
    用 Bitcron 搭博客:你只管写作,它负责呈现
    如何去掉Myeclipse对JS等文件的验证
  • 原文地址:https://www.cnblogs.com/huxi2b/p/6525465.html
Copyright © 2011-2022 走看看