zoukankan      html  css  js  c++  java
  • kafka-Consumer API

    1.简单消费者(自动提交)

      1.1 编写消费者代码

    package com.wn.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Properties;
    
    /*简单消费者*/
    public class MyConsumer {
        public static void main(String[] args){
            //创建kafka消费者配置信息
            Properties properties = new Properties();
            //连接集群
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            //自动提交的延迟
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            //key,value的反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"aaa");
            //创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            //订阅主题 一个消费者可以订阅多个主题
            consumer.subscribe(Arrays.asList("aaa","wnwn"));
            //获取消息
            while (true){
                //读取消息的超时时间
                ConsumerRecords<String, String> poll = consumer.poll(100);
                for(ConsumerRecord<String,String> record:poll){
                    System.out.println(record.partition());
                    System.out.println(record.key());
                    System.out.println(record.value());
                    System.out.println(record.offset());
                    System.out.println("------------------------------------------");
                }
            }
    
        }
    }

      1.2 启动zookeeper+kafka

      1.3 启动,执行方法

        1.3.1 启动消费者

        1.3.2 启动生产者

    bin/kafka-console-producer.sh --broker-list 192.168.138.59092,192.168.138.66:9092,192.168.138.77:9092 --topic aaa

        1.3.3 查看效果

          

          

    2.API消费者重置offset(自动提交)

      2.1 编写代码

    package com.wn.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /*消费者重置offset*/
    public class MyConsumerOffset {
        public static void main(String[] args){
            //创建kafka消费者配置信息
            Properties properties = new Properties();
            //连接集群
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            //自动提交的延迟
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            //key,value的反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组  如果想重置消费者就需要是新的消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"bbb");
            //重置消费者的offset
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
            //创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            //订阅主题 一个消费者可以订阅多个主题
            consumer.subscribe(Arrays.asList("aaa","wnwn"));
            //获取消息
            while (true){
                //读取消息的超时时间
                ConsumerRecords<String, String> poll = consumer.poll(100);
                for(ConsumerRecord<String,String> record:poll){
                    System.out.println(record.partition());
                    System.out.println(record.key());
                    System.out.println(record.value());
                    System.out.println(record.offset());
                    System.out.println("------------------------------------------");
                }
            }
    
        }
    }

        注意:如果想消费以前消费过的数据,需要将消费者组设置成新的;如果在不修改消费者组的情况下,只能消费没有消费过的数据(启动生产者重新生产数据);

          

      2.2 启动,测试

        

        可以消费以前的数据;

    3.API消费者手动提交offset

      虽然自动提交offset十分简便,但由于是基于时间提交的,开发人员难以把握offset提交的时机。因此kafka还提供了手动提交的API;

      手动提交offset的方式有两种,分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是:都会将本次poll的一批数据最高的偏移量提交;不同点是:commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),而commitAsync则没有失败重试机制,故有可能提交失败;

      3.1 同步提交offset

        由于同步提交offset有重试机制,故更加可靠,一下是同步提交offset的案例;

    package com.wn.consumer;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Arrays;
    import java.util.Map;
    import java.util.Properties;
    
    /*手动提交offset*/
    public class MyConsumerCommit {
        public static void main(String[] args){
            //创建kafka消费者配置信息
            Properties properties = new Properties();
            //连接集群
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            //自动提交的延迟
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            //key,value的反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组  如果想重置消费者就需要是新的消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ccc");
            //创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            //订阅主题 一个消费者可以订阅多个主题
            consumer.subscribe(Arrays.asList("aaa","wnwn"));
            //获取消息
            while (true){
                //读取消息的超时时间
                ConsumerRecords<String, String> poll = consumer.poll(100);
                for(ConsumerRecord<String,String> record:poll){
                    System.out.println(record.partition());
                    System.out.println(record.key());
                    System.out.println(record.value());
                    System.out.println(record.offset());
                    System.out.println("------------------------------------------");
                }
                //同步提交,当前线程会阻塞直到offset提交成功
                consumer.commitSync();      //效率低
            }
    
        }
    }

      3.2 异步提交offset

        虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量受到很大的影响。因此更多的情况下,会选用异步提交offset的方式;

    package com.wn.consumer;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Arrays;
    import java.util.Map;
    import java.util.Properties;
    
    /*手动提交offset*/
    public class MyConsumerCommit {
        public static void main(String[] args){
            //创建kafka消费者配置信息
            Properties properties = new Properties();
            //连接集群
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            //自动提交的延迟
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            //key,value的反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组  如果想重置消费者就需要是新的消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ccc");
            //创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            //订阅主题 一个消费者可以订阅多个主题
            consumer.subscribe(Arrays.asList("aaa","wnwn"));
            //获取消息
            while (true){
                //读取消息的超时时间
                ConsumerRecords<String, String> poll = consumer.poll(100);
                for(ConsumerRecord<String,String> record:poll){
                    System.out.println(record.partition());
                    System.out.println(record.key());
                    System.out.println(record.value());
                    System.out.println(record.offset());
                    System.out.println("------------------------------------------");
                }//异步提交
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                        if (e!=null){
                            System.out.println("commit failed for"+map);
                        }
                    }
                });
            }
    
        }
    }

      3.3 数据漏消费和重复消费分析

        无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能造成数据的重复消费;

    4.自定义存储offset

      kafka9.0版本之前,offset存储在zookeeper0.9版本及之后。默认将offset存储在kafka的一个内置的topic中。除此之外,kafka还可以选择自定义存储offset;

      offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace;

      当有新的消费者加入消费者组,已有的消费者推出消费者或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance;

      消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取自己被重新分配的分区,并且定位到每个分区最近提交的offset位置继续消费;

      要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为实例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现;

    package com.wn.consumer;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.*;
    
    public class MyConsumerCommitMy {
        private static Map<TopicPartition,Long> currentOffset=new HashMap<>();
        public static void main(String[] args){
            //创建kafka消费者配置信息
            Properties properties = new Properties();
            //连接集群
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
            //自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            //自动提交的延迟
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            //key,value的反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组  如果想重置消费者就需要是新的消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ccc");
            //创建消费者
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            //消费者订阅主题
            consumer.subscribe(Arrays.asList("aaa"), new ConsumerRebalanceListener() {
                //该方法会在rebalance之前调用
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    commitOffset(currentOffset);
                }
    
                //该方法会在rebalance之后调用
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    currentOffset.clear();
                    for (TopicPartition partition:collection){
                        //定位到最近提交的offset位置继续消费
                        consumer.seek(partition,getOffset(partition));
                    }
                }
            });
            //获取消息
            while (true){
                //读取消息的超时时间
                ConsumerRecords<String, String> poll = consumer.poll(100);
                for(ConsumerRecord<String,String> record:poll){
                    System.out.println(record.partition());
                    System.out.println(record.key());
                    System.out.println(record.value());
                    System.out.println(record.offset());
                    currentOffset.put(new TopicPartition(record.topic(),record.partition()),record.offset());
                    System.out.println("------------------------------------------");
                }
               //异步提交
                commitOffset(currentOffset);
            }
        }
        //获取某分区的最新offset
        private static long getOffset(TopicPartition partition){
            return 0;
        }
        //提交该消费者所有分区的offset
        public static void commitOffset(Map<TopicPartition,Long> currentOffset){
    
        }
    }
  • 相关阅读:
    C#博客随笔之四:使用C#模拟办公网登录HttpClient的使用
    C#博客随笔之三:Linq in C#
    C#博客随笔之二:wp开发之弹出对话框
    C#博客随笔之一:使用C#的第一个WP程序
    Fedora15命令速查手册
    乐观是一种智慧
    完全教程 Aircrackng破解WEP、WPAPSK加密利器
    FreeBSD常用命令大全
    Linux 网络管理员指南——前言
    API
  • 原文地址:https://www.cnblogs.com/wnwn/p/12459315.html
Copyright © 2011-2022 走看看