zoukankan      html  css  js  c++  java
  • Kakfa重连测试

    在Kafak已启动的情况下:

    发送端首次连接大概耗时400毫秒。后续消息发送都在1毫秒以下。

    接收端首次连接大概耗时400-7000毫秒。后续消息接收都在1毫秒以下。(具体时间与topic中存留的消息量有关)

    但在使用Kafka时,会遇到Kafka重启。或者启用应用时Kafak还没有启动的情况,针对于各种情况进行测试。

    测试消息发起端

    Properties props = new Properties();
    
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
    Producer<String, String> producer = new KafkaProducer<String, String>(properties);
    producer.send(new ProducerRecord<String, String>(topic, message));

    1.发起端先启动,Kafak后启动

       创建producer不会进行连接,直接进入消息发送,耗时大概300-400毫秒。

       发送端连接不上,60秒后方法会返回,但不报错。

       可通过MAX_BLOCK_MS_CONFIG参数,调整超时时间,单位是毫秒。

    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);

     如果需要发送失败的异常,则需要在发送方法后,增加get

    producer.send(new ProducerRecord<String, String>(topic, message)).get();

    2.发起端保持启动,Kafak启动,或重启

       发起端在Kafak启动完成后,会自动进行连接。无需人工干预。

    测试消息接收端

    Properties props = new Properties();
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    while(true){ ConsumerRecords<String, String> records = consumer.poll(1); 
      if (!records.isEmpty()) { 
        for (ConsumerRecord<String, String> record : records) { 
          System.out.println(record.value());
        } 
      } 
    }

    Kafak接收端会忽略一些早期的消息,有时候会出现前N条丢失的情况,如果需要保证之前的消息都接收,需要增加参数

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    1.接收端先启动,Kafak后启动

       接收端在Kafak启动完成后,会自动进行连接。无需人工干预。

    2.接收端保持启动,Kafak启动,或重启

        接收端在Kafak启动完成后,会自动进行连接。无需人工干预。

  • 相关阅读:
    Blend操作入门: 别站在门外偷看,快进来吧!
    Egyee,这个世界很美丽
    在Windows Azure中使用自己的域名
    Windows Azure服务购买,收费,使用注意事项及学习资料推荐
    Silverlight怎样加载xap中的各个程序集
    WCF RIA Service随想
    Silverlight布局,也许不止这么多(附照片墙示例及源码)
    An introduction to variable and feature selection
    Compressed Learning
    Robust PCA (摘抄)
  • 原文地址:https://www.cnblogs.com/maobuji/p/5671013.html
Copyright © 2011-2022 走看看