zoukankan      html  css  js  c++  java
  • Kafka消费者

    消费者和消费者群组

    生产者往主题写入消息的速度超过了应用程序验证数据的速度。如果使用单个消费者处理消息,应用程序跟不上消息生成的速度。此时,有必要对消费者进行横向伸缩,我们可以使用多个消费者从同一个主题读取消息,对消息进行分流。

    一个群组里的消费者订阅同一个主题,每个消费者接收主题的一部分分区的消息。

    不同群组之间消费者互不影响

    消费者编程

    在读取消息之前,需要先创建一个KafkaConsumer对象。

    Properties props = new Properties();
    props.put("bootstrap.servers", "broker1:9092,broker2:9092");
    props.put("group.id", "CountryCounter");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    
    订阅

    创建消费者后,下一步要订阅主题,subscribe()方法接收一个主题列表作为参数

    consumer.subscribe(Collections.singletonList("customerCountries"));
    

    分区分配策略

    Kafka有两种分配策略,一是RoundRobin,一是Range

    RoundRobin消息轮询(轮流询问)是消费者API的核心,通过一个简单的轮询向服务器请求数据。轮询会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。

    try {
        while (true) { 
            ConsumerRecords<String, String> records = consumer.poll(100); 
        for (ConsumerRecord<String, String> record : records) 
        {
        log.debug("topic = %s, partition = %s, offset = %d, customer = %s,
        country = %s
    ",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
        int updatedCount = 1;
        if (custCountryMap.countainsValue(record.value())) {
            updatedCount = custCountryMap.get(record.value()) + 1;
        }
        custCountryMap.put(record.value(), updatedCount)
        JSONObject json = new JSONObject(custCountryMap);
        System.out.println(json.toString(4)) 
        }
        }
    } finally {
        consumer.close(); 
    }
    

    Range根据当前的消费者组来划分,一个范围一个范围来分配。会产生消费者之间订阅数量差距过大的问题。

    提交和偏移量

    每次调用poll()方法,会返回生产者写入kafka但还没被读取的记录,因此我们可以追踪到哪些记录是被群组里的哪个消费者读取的。

    消费者默认将offset保存在kafka内置的topic中,该topic为__consumer_offset

  • 相关阅读:
    金丝雀发布、滚动发布、蓝绿发布到底有什么差别?关键点是什么?
    分析占用了大量 CPU 处理时间的是Java 进程中哪个线程
    阿里巴巴首部记录片问世:看哭所有创业者!
    【LBS】基于地理位置的搜索之微信 附近的人 简单实现
    【Linux】Linux 常用命令汇总
    【心灵鸡汤】为什么我会去云平台讲课
    【Mysql数据库访问利器】phpMyadmin
    【Java】一台服务器配置多个Tomcat
    tp5, laravel, yii2我该选择哪个
    网站升级2.0回滚机制
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12517906.html
Copyright © 2011-2022 走看看