zoukankan      html  css  js  c++  java
  • Kafka 消费者API

    消费者api,自动提交offset

    public class MyConsumer {
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            //连接的集群
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
            //开启自动提交(消费偏移量)
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
            //自动提交的延迟
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
            //KV的反序列化类
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc");
    
            //消费者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            //订阅主题
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            while (true){
                //获取数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
                //解析数据
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
                }
            }
    
        }
    }

    手动提交offset,同步提交

    public class ConsumerOffsetSync {
        public static void main(String[] args) {
    
            Properties props = new Properties();
            //连接的集群
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
            //关闭自动提交(消费偏移量)
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            //KV的反序列化类
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1");
    
            //重置offset。
            //earliest:从头开始消费,触发的条件1,换组;条件2:保留的offset指向的数据已经不存在
            //latest:默认值,消费最新的数据。
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
            //消费者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            //订阅主题
            kafkaConsumer.subscribe(Collections.singletonList("first"));
    
            while (true){
                //获取数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
                //解析数据
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
                }
    
                //同步提交,当前线程会阻塞直到 offset 提交成功
                kafkaConsumer.commitSync();
            }
    
        }
    }

    手动提交offset,异步提交

    //异步提交
    kafkaConsumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            System.err.println("Commit failed for" +
                    offsets);
        }
    });
  • 相关阅读:
    bootstrap基本用法
    Maven学习笔记(一)
    Tomcat的安装以及基本配置
    jQuery实现用户头像裁剪插件cropbox.js
    position的用法与心得
    ES6新特性学习(一)
    jQuery mobile 滑动打开面板
    vue-day05----自定义指令(directive)、render和template的区别、mixin混入、Vue.use()、Vue.extend()、Vue.filter()、vue中的数据流向
    我的一个React路由嵌套(多级路由),路由传参之旅
    vue04----watch、slot、响应式原理、set、vue脚手架(vue-cli)、单页面应用和多页面应用、单页面开发首屏加载过慢,白屏如何缓解
  • 原文地址:https://www.cnblogs.com/noyouth/p/12866005.html
Copyright © 2011-2022 走看看