zoukankan      html  css  js  c++  java
  • KAFKA consumer常用api

    Kafka中如何创建消费者Consumer已经在前面给大家详细的讲解过,那么如何使用JAVA来消费topic中的数据呢呢,今天就说说。
    还是先创建一个topic,拥有一个副本和一个分区

    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    示例:

    一 自动提交offset

        public class MyConsumer{
            public static void main(String[] args) {
                Properties prop = new Properties();
                prop.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
    //或者
                prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.5.150:9092");
    
                prop.put("group.id", "testGroup1"); //必须要组名
    //或者
                prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup1");
    
                prop.put("enable.auto.commit", "true");//默认值true 指定为自动提交offset
    //或者
                prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    
    
                prop.put("auto.commit.interval.ms", "1000");//默认值5000
    //或者
                prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    
                prop.put(auto.offset.reset, "earliest"); //相当于从开始读 --from-beginning
    //或者
                prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    //反序列化
                prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    //或者
                prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    
                prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    //或者
                prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    
    //创建consumer对象
                KafkaConsumer<String, String> consumer = new KafkaConsumer(prop);
    //消费者订阅的topic, 可同时订阅多个,指定topic名字
                consumer.subscribe(Arrays.asList("mytest"));//subscribe不能指定partition
    
    
    //从topic消费数据
                consumer.assign(Arrays.asList(new TopicPartition("mytest",0))); //分区号是0分区,就是第一个分区 assign可以指定partiton
    
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(1000); //按时间段读取数据,读取超时时间为1000ms
                    for (ConsumerRecord<String, String> record : records){
                        System.out.printf(
                                record.partition(),record.offset(), record.key(), record.value());
                    }
                }
            }
        }

    解析:

    bootstrap.servers 只是代表kafka的连接入口,只需要指定集群中的某一broker,全部指定也没关系;
    一旦consumer和kakfa集群建立连接,consumer会以心跳的方式来高速集群自己还活着,如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence。

    二 手动提交offset

    如果consumer在获得数据后需要加入处理,数据完毕后才确认offset,需要程序来控制offset的确认。举个例子:
    consumer获得数据后,需要将数据持久化到DB中。自动确认offset的情况下,如果数据从kafka集群读出,就确认,但是持久化过程失败,就会导致数据丢失。我们就需要控制offset的确认。

    Properties props = new Properties();
    props.put("bootstrap.servers", "node01:9092");
    props.put("group.id", "testGroup1"); //必须要组名
    
    //指定为手动提交offset
    props.put("enable.auto.commit", "false");//默认值true
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
        //创建consumer对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer(prop);
    //消费者订阅的topic, 可同时订阅多个,指定topic名字
    consumer.subscribe(Arrays.asList("mytest"));
    
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
    //数据达到批量要求,就写入DB,同步确认offset
            if (buffer.size() >= minBatchSize) {
    //提交offset,只要是未提交,表示消息没有被消费,下次重启的时候会继续消费
                insertIntoDb(buffer);
                consumer.commitAsync();
                buffer.clear();
            }
        }

    三 指定从topic的特定分区的特定某个offset开始消费

    //创建consumer对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    consumer.subscribe(Arrays.asList("mytest")); //指定topic名字
    
    //从topic消费数据
    consumer.assign(Arrays.asList(new TopicPartition("mytest",0))); //分区号是0分区,就是第一个分区
    //消费2号分区的数据
    consumer.seek(testpartition2,offset:50)//指定从topic的2号分区的某个offset开始消费
    //从头开始消费
                consumer.seekToBeginning(Arrays.asList(testpartition2)
    
                while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf(
                        "partition=%d,offset=%d,key=%s,value=%s%n",
                        record.partition(),record.offset(),record.key(), record.value())
        }
    }

    说明:确认的offset为已接受数据最大offset+1。

    分区订阅
    可以向特定的分区订阅消息。但是会失去partion的负载分担。有几种场景可能会这么用:

    只需要获取本机磁盘的分区数据;
    程序自己或者外部程序能够自己实现负载和错误处理。例如YARN/Mesos的介入,当consumer挂掉后,再启动一个consumer。
    String topic = "mytest"
    TopicPartition partition0 = new TopicPartition(topic, 0);
    TopicPartition partition1 = new TopicPartition(topic, 1);
    consumer.assign(Arrays.asList(partition0, partition1));
    1
    2
    3
    说明:

    此种情况用了consumer Group,也不会做负载均衡。
    topic的订阅和分区订阅不可以在同一consumer中混用。
    外部存储offset
    消费者可以自定义kafka的offset存储位置。该设计的主要目的是让消费者将数据和offset进行原子性的存储。这样可以避免上面提到的重复消费问题。举例说明:
    订阅特定分区。存储所获得的记录时,将每条记录的offset一起存储。保证数据和offset的存储是原子性的。当异步存储被异常打断时,凡已经存储的数据,都有相应的offset记录。这种方式可以保证不会有数据丢失,也不会重复的从服务端读取。
    参数配置:

    去使offset自动确认:enable.auto.commit=false;
    从ConsumerRecord中获取offset,保存下来;
    Consumer重启时,调用seek(TopicPartition, long)重置在服务端的消费记录。
    如果消费分区也是自定义的,这种方式用起来会很爽。如果分区是自动分配的,当分区发生reblance的时候,就要考虑清楚了。如果因为升级等原因,分区漂移到一个不会更新offset的consumer上,那就不好处理了。
    该情况下:

    原Consumer需要监听分区撤销事件,并在撤销时确认好offset。接口:ConsumerRebalanceListener.onPartitionsRevoked(Collection);
    新Consumer监听分区分配事件,获取当前分区消费的offset。接口:ConsumerRebalanceListener.onPartitionsAssigned(Collection)
    Consumer监听到 ConsumerRebalance事件,还没有处理或者持久化的缓存数据flush掉。
    控制消费位置
    大多数情况下,服务端的Consumer的消费位置都是由客户端间歇性的确认。Kafka允许Consumer自己设置消费起点,达到的效果:
    1.可以消费已经消费过的数据;
    2. 可以跳跃性的消费数据;
    看下这样做的一些场景:

    对Consumer来说,数据具备时效性,只需要获取最近一段时间内的数据,就可以进行跳跃性的获取数据;
    上面自己存offset的场景,重启后就需要从指定的位置开始消费。
    控制消费流Consumption Flow Control
    如果一个consumer同时消费多个分区,默认情况下,这多个分区的优先级是一样的,同时消费。Kafka提供机制,可以让暂停某些分区的消费,先获取其他分区的内容。场景举例:

    流式计算,consumer同时消费两个Topic,然后对两个Topic的数据做Join操作。但是这两个Topic里面的数据产生速率差距较大。Consumer就需要控制下获取逻辑,先获取慢的Topic,慢的读到数据后再去读快的。
    同样多个Topic同时消费,但是Consumer启动是,本地已经存有了大量某些Topic数据。此时就可以优先去消费下其他的Topic。
    调控的手段:让某个分区消费先暂停,时机到了再恢复,然后接着poll。接口:pause(TopicPartition…),resume(TopicPartition…)

    总结

    1 subscribe 只能指定topic名字,入参是list

    2 assign可以指定topic和partition

    3 seek指定topic和partition,还有offset

  • 相关阅读:
    1635:【例 5】Strange Way to Express Integers
    1633:【例 3】Sumdiv
    1632:【 例 2】[NOIP2012]同余方程
    1631:【例 1】青蛙的约会
    1629:聪明的燕姿
    1628:X-factor Chain
    1627:【例 3】最大公约数
    1626:【例 2】Hankson 的趣味题
    file_put_contens小trick
    billu b0x2靶机渗透
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15693285.html
Copyright © 2011-2022 走看看