zoukankan      html  css  js  c++  java
  • Kakfa重连测试

    在Kafak已启动的情况下:

    发送端首次连接大概耗时400毫秒。后续消息发送都在1毫秒以下。

    接收端首次连接大概耗时400-7000毫秒。后续消息接收都在1毫秒以下。(具体时间与topic中存留的消息量有关)

    但在使用Kafka时,会遇到Kafka重启。或者启用应用时Kafak还没有启动的情况,针对于各种情况进行测试。

    测试消息发起端

    Properties props = new Properties();
    
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
    Producer<String, String> producer = new KafkaProducer<String, String>(properties);
    producer.send(new ProducerRecord<String, String>(topic, message));

    1.发起端先启动,Kafak后启动

       创建producer不会进行连接,直接进入消息发送,耗时大概300-400毫秒。

       发送端连接不上,60秒后方法会返回,但不报错。

       可通过MAX_BLOCK_MS_CONFIG参数,调整超时时间,单位是毫秒。

    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);

     如果需要发送失败的异常,则需要在发送方法后,增加get

    producer.send(new ProducerRecord<String, String>(topic, message)).get();

    2.发起端保持启动,Kafak启动,或重启

       发起端在Kafak启动完成后,会自动进行连接。无需人工干预。

    测试消息接收端

    Properties props = new Properties();
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    while(true){ ConsumerRecords<String, String> records = consumer.poll(1); 
      if (!records.isEmpty()) { 
        for (ConsumerRecord<String, String> record : records) { 
          System.out.println(record.value());
        } 
      } 
    }

    Kafak接收端会忽略一些早期的消息,有时候会出现前N条丢失的情况,如果需要保证之前的消息都接收,需要增加参数

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    1.接收端先启动,Kafak后启动

       接收端在Kafak启动完成后,会自动进行连接。无需人工干预。

    2.接收端保持启动,Kafak启动,或重启

        接收端在Kafak启动完成后,会自动进行连接。无需人工干预。

  • 相关阅读:
    应用图标大小
    AndroidStudio使用笔记
    shell 三剑客之 sed 命令详解
    shell 三剑客之 sed pattern 详解
    shell 文本处理三剑客之 grep 和 egrep
    Shell 编程中的常用工具
    shell 函数的高级用法
    shell 数学运算
    shell 变量的高级用法
    nginx 之 https 证书配置
  • 原文地址:https://www.cnblogs.com/maobuji/p/5671013.html
Copyright © 2011-2022 走看看