zoukankan      html  css  js  c++  java
  • Kafka 消费者API

    消费者api,自动提交offset

    public class MyConsumer {
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            //连接的集群
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
            //开启自动提交(消费偏移量)
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            //自动提交的延迟
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
            //KV的反序列化类
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc");
    
            //消费者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            //订阅主题
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            while (true){
                //获取数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
                //解析数据
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
                }
            }
    
        }
    }

    手动提交offset,同步提交

    public class ConsumerOffsetSync {
        public static void main(String[] args) {
    
            Properties props = new Properties();
            //连接的集群
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
            //关闭自动提交(消费偏移量)
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            //KV的反序列化类
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1");
    
            //重置offset。
            //earliest:从头开始消费,触发的条件1,换组;条件2:保留的offset指向的数据已经不存在
            //latest:默认值,消费最新的数据。
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
            //消费者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            //订阅主题
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            while (true){
                //获取数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
                //解析数据
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
                }
    
                //同步提交,当前线程会阻塞直到 offset 提交成功
                kafkaConsumer.commitSync();
            }
    
        }
    }

    手动提交offset,异步提交

    //异步提交
    kafkaConsumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            System.err.println("Commit failed for" +
                    offsets);
        }
    });
  • 相关阅读:
    Mysql TEXT类型长度
    php中的||和or的区别 优先级
    常用的排序算法的时间复杂度和空间复杂度
    ThinkPHP 多应用多模块建立方式
    phpcms v9 后台添加修改页面空白页问题解决方法
    linux中nginx重定向方法总结
    Nginx的主要配置参数说明
    Apache多网站虚拟目录域名
    xampp命令
    (转载)处理SQL解析失败导致share pool 的争用
  • 原文地址:https://www.cnblogs.com/noyouth/p/12866005.html
Copyright © 2011-2022 走看看