zoukankan      html  css  js  c++  java
  • Apache Kafka(九)- Kafka Consumer 消费行为

    1. Poll Messages

    Kafka Consumer 中消费messages时,使用的是poll模型,也就是主动去Kafka端取数据。其他消息管道也有的是push模型,也就是服务端向consumer推送数据,consumer仅需等待即可。

    Kafka Consumerpoll模型使得consumer可以控制从log的指定offset去消费数据、消费数据的速度、以及replay events的能力。

    Kafka Consumer poll模型工作如下图:

     

     

    • ·       Consumer 调用.poll(Duration timeout) 方法,向broker请求数据
    • ·       若是broker端有数据则立即返回;否则在timeout时间后返回empty

    我们可以通过参数控制 Kafka Consumer 行为,主要有:

    • ·       Fetch.min.bytes(默认值是1

    o   控制在每个请求中,至少拉取多少数据

    o   增加此参数可以提高吞吐并降低请求的数目,但是代价是增加延时

     

    • ·       Max.poll.records(默认是500

    o   控制在每个请求中,接收多少条records

    o   如果消息普遍都比较小而consumer端又有较大的内存,则可以考虑增大此参数

    o   最好是监控在每个请求中poll了多少条消息

    • ·       Max.partitions.fetch.bytes(默认为1MB

    o   Broker中每个partition可返回的最多字节

    o   如果目标端有100多个partitions,则需要较多内存

    • ·       Fetch.max.bytes(默认50MB

    o   对每个fetch 请求,可以返回的最大数据量(一个fetch请求可以覆盖多个partitions

    o   Consumer并行执行多个fetch操作

    默认情况下,一般不建议手动调整以上参数,除非我们的consumer已经达到了默认配置下的最高的吞吐,且需要达到更高的吞吐。

    2. Consumer Offset Commit 策略

    在一个consumer 应用中,有两种常见的committing offsets的策略,分别为:

    • ·       (较为简单)enable.auto.commit = true:自动commit offsets,但必须使用同步的方式处理数据
    • ·       (进阶)enable.auto.commit = false:手动commit offsets

    在设置enable.auto.commit = true时,考虑以下代码:

    while(true) {
         List<Records> batch = consumer.poll(Duration.ofMillis(100));
         doSomethingSynchronous(batch);
     }

    一个Consumer 每隔100ms poll一次消息,然后以同步地方式处理这个batch的数据。此时offsets 会定期自动被commit,此定期时间由 auto.commit.interval.ms 决定,默认为 5000,也就是在每次调用 .poll() 方法 5 秒后,会自动commit offsets

    但是如果在处理数据时用的是异步的方式,则会导致“at-most-once”的行为。因为offsets可能会在数据被处理前就被commit

    所以对于新手来说,使用 enable.auto.commit = true 可能是有风险的,所以不建议一开始就使用这种方式

    若设置 enable.auto.commit = false,考虑以下代码:

    while(true) {
         List<Records> batch = consumer.poll(Duration.ofMillis(100));
         if isReady(batch){
             doSomethingSynchronous(batch);
             consumer.commitSync();
         }
     }
    

      

    此例子明确指示了在同步地处理了数据后,再主动commit offsets。这样我们可以控制在什么条件下,去commit offsets。一个比较典型的场景为:将接收的数据读入缓存,然后flush 缓存到一个数据库中,最后再commit offsets

    3. 手动Commit Offset 示例

    首先我们关闭自动commit offsets

    // disable auto commit of offsets
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    指定每个请求最多接收10records,便于测试:
    properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

    添加以下代码逻辑:

    public static void main(String[] args) throws IOException {
         Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
         RestHighLevelClient client = createClient();
     
         // create Kafka consumer
         KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
     
         // poll for new data
         while(true){
             ConsumerRecords<String, String> records =
                     consumer.poll(Duration.ofMinutes(100));
     
             logger.info("received " + records.count() + "records");
             for(ConsumerRecord record : records) {
     
                 // construct a kafka generic ID
                 String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
     
                 // where we insert data into ElasticSearch
                 IndexRequest indexRequest = new IndexRequest(
                         "kafkademo"
                 ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
     
                 IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
                 String id = indexResponse.getId();
     
                 logger.info(id);
     
                 try {
                     Thread.sleep(10); // introduce a small delay
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
     
             logger.info("Committing offsets...");
             consumer.commitSync();                      // commit offsets manually
             logger.info("Offsets have been committed");
     
             }
         }

    这里我们在处理每次获取的10records后(也就是for 循环完整执行一次),手动执行一次offsets commit。打印日志记录为:

     

    手动停止consumer 程序后,可以看到最后的committed offsets165

      

    使用consumer-group cli 也可以验证当前committed offsets165

      

    4. Performance Improvement using Batching

    在这个例子中,consumer 限制每次poll 10条数据,然后每条依次处理(插入elastic search)。此方法效率较低,我们可以通过使用 batching 的方式增加吞吐。这里实现的方式是使用 elastic search API 提供的BulkRequest,基于之前的代码,修改如下:

    public static void main(String[] args) throws IOException {
         Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
         RestHighLevelClient client = createClient();
     
         // create Kafka consumer
         KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
     
         // poll for new data
         while(true){
             ConsumerRecords<String, String> records =
                     consumer.poll(Duration.ofMinutes(100));
     
             // bulk request
             BulkRequest bulkRequest = new BulkRequest();
     
             logger.info("received " + records.count() + "records");
             for(ConsumerRecord record : records) {
     
                 // construct a kafka generic ID
                 String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
     
                 // where we insert data into ElasticSearch
                 IndexRequest indexRequest = new IndexRequest(
                         "kafkademo"
                 ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
     
                 IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
     
                 // add to our bulk request (takes no time)
                 bulkRequest.add(indexRequest);
     
     
                 //String id = indexResponse.getId();
                 //logger.info(id);
     
                 try {
                     Thread.sleep(10); // introduce a small delay
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
     
             // bulk response
             BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
     
             logger.info("Committing offsets...");
             consumer.commitSync();                      // commit offsets manually
             logger.info("Offsets have been committed");
     
             }
         }
    

       

    可以看到,consumerpoll到记录后,并不会一条条的向elastic search 发送,而是将它们放入一个BulkRequest,并在for循环结束后发送。在发送完毕后,再手动commit offsets

    执行结果为:

  • 相关阅读:
    ngx_os_init解析
    cacheline相关优化手段
    std::thread_local
    std::initializer_list<T>
    MySQL group by 注意事项
    MySQL IFNULL函数
    python nginx不同参数压测脚本
    完全卸载oracle11g步骤
    Maven常用命令(转载)
    项目SVN的IP地址发生变化时修改SVN为新的IP地址
  • 原文地址:https://www.cnblogs.com/zackstang/p/11515203.html
Copyright © 2011-2022 走看看