zoukankan      html  css  js  c++  java
  • kafka的客户端操作,consumer API

    一:Consumer API

    1.自动提交程序

      这种不建议在实际中使用

        /**
         * 简单的消费kafka消息,自动提交
         * 消费过的数据再消费不到了
         */
        public static void helloConsumer() {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
            properties.setProperty("group.id", "test");
            properties.setProperty("enable.auto.commit", "true");
            properties.setProperty("auto.commit.interval.ms", "1000");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
            // 订阅
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            while (true) {
                // 1000毫秒拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    

      

    2.手动提交offset

      如果事务失败了,么有提交,下次还能继续获取到数据

     /**
         * 手动提交
         */
        public static void commitedOffset() {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
            properties.setProperty("group.id", "test");
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.commit.interval.ms", "1000");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
            // 订阅
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            while (true) {
                // 1000毫秒拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                // 手动提交,for循环成功才执行;不然不执行,在下一次还会再拉取数据
                consumer.commitAsync();
            }
        }
    

      

    3.ConsumerGroup

      单个分区的消息只能有ConsumerGroup中的某个Consumer消费

      Consumer从partition中的消费是顺序,默认从头开始

      单个ConsumerGroup会消费所有partition中的消息

    4.特性

      

    5.按照patition维度进行处理

    /**
         * 按照patition维度进行处理
         */
        public static void commitedOffsetWithPartition() {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
            properties.setProperty("group.id", "test");
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.commit.interval.ms", "1000");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
            // 订阅
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            while (true) {
                // 1000毫秒拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                // 操作维度是partition了,每个partition单独处理
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : pRecords) {
    
                        System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                    }
                    long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                    // 手动提交
                    Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                    offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    consumer.commitSync(offset);
                }
            }
        }
    

      

    6.只消费某个partition

        /**
         * 订阅topic下的partition中的内容
         *
         */
        public static void commitedOffsetWithTopicPartition() {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
            properties.setProperty("group.id", "test");
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.commit.interval.ms", "1000");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
    
            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
    
    
            // 订阅partition
            consumer.assign(Arrays.asList(p1));
            while (true) {
                // 1000毫秒拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                // 操作维度是partition了,每个partition单独处理
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : pRecords) {
    
                        System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                    }
                    long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                    // 手动提交
                    Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                    offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    consumer.commitSync(offset);
                }
            }
        }
    

      

    二:Consumer API的多线程处理

     1.第一种方式

      

    2.程序

    package com.jun.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.WakeupException;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class ConsumerThreadSample {
        private final static String TOPIC_NAME="caojun-topic";
    
        /*
            这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全
         */
        public static void main(String[] args) throws InterruptedException {
            KafkaConsumerRunner r1 = new KafkaConsumerRunner();
            Thread t1 = new Thread(r1);
    
            t1.start();
    
            Thread.sleep(15000);
    
            r1.shutdown();
        }
    
        public static class KafkaConsumerRunner implements Runnable{
            private final AtomicBoolean closed = new AtomicBoolean(false);
            private final KafkaConsumer consumer;
    
            public KafkaConsumerRunner() {
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.19.129:9092");
                props.put("group.id", "test");
                props.put("enable.auto.commit", "false");
                props.put("auto.commit.interval.ms", "1000");
                props.put("session.timeout.ms", "30000");
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
                consumer = new KafkaConsumer<>(props);
    
                TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
                TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
    
                consumer.assign(Arrays.asList(p0,p1));
            }
    
    
            public void run() {
                try {
                    while(!closed.get()) {
                        //处理消息
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
    
                        for (TopicPartition partition : records.partitions()) {
                            List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                            // 处理每个分区的消息
                            for (ConsumerRecord<String, String> record : pRecord) {
                                System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                        record.partition(),record.offset(), record.key(), record.value());
                            }
    
                            // 返回去告诉kafka新的offset
                            long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                            // 注意加1
                            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                        }
    
                    }
                }catch(WakeupException e) {
                    if(!closed.get()) {
                        throw e;
                    }
                }finally {
                    consumer.close();
                }
            }
    
            public void shutdown() {
                closed.set(true);
                consumer.wakeup();
            }
        }
    
    }
    

      

    3.第二种方式

      这种方式,是没有办法提交offset的,只是为了快速消费数据

      

    4.程序

    package com.jun.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ConsumerRecordThreadSample {
        private final static String TOPIC_NAME = "caojun-topic";
    
        public static void main(String[] args) throws InterruptedException {
            String brokerList = "192.168.19.129:9092";
            String groupId = "test";
            int workerNum = 5;
    
            CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
            consumers.execute(workerNum);
    
            Thread.sleep(1000000);
    
            consumers.shutdown();
    
        }
    
        // Consumer处理
        public static class CunsumerExecutor{
            private final KafkaConsumer<String, String> consumer;
            private ExecutorService executors;
    
            public CunsumerExecutor(String brokerList, String groupId, String topic) {
                Properties props = new Properties();
                props.put("bootstrap.servers", brokerList);
                props.put("group.id", groupId);
                props.put("enable.auto.commit", "true");
                props.put("auto.commit.interval.ms", "1000");
                props.put("session.timeout.ms", "30000");
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList(topic));
            }
    
            public void execute(int workerNum) {
                executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
    
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(200);
                    for (final ConsumerRecord record : records) {
                        executors.submit(new ConsumerRecordWorker(record));
                    }
                }
            }
    
            public void shutdown() {
                if (consumer != null) {
                    consumer.close();
                }
                if (executors != null) {
                    executors.shutdown();
                }
                try {
                    if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                        System.out.println("Timeout.... Ignore for this case");
                    }
                } catch (InterruptedException ignored) {
                    System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                    Thread.currentThread().interrupt();
                }
            }
    
    
        }
    
        // 记录处理
        public static class ConsumerRecordWorker implements Runnable {
    
            private ConsumerRecord<String, String> record;
    
            public ConsumerRecordWorker(ConsumerRecord record) {
                this.record = record;
            }
    
            @Override
            public void run() {
                // 假如说数据入库操作
                System.out.println("Thread - "+ Thread.currentThread().getName());
                System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                        record.partition(), record.offset(), record.key(), record.value());
            }
    
        }
    }

    三:一些其他的特性

    1.手动控制offset

    /**
         * 手动指定offset的起始位置,手动提交offset
         *
         * 手动指定offset起始位置
         *  1、人为控制offset起始位置
         *  2、如果出现程序错误,重复消费一次
         *
         * 步骤
         *   1、第一次从0消费【一般情况】
         *   2、比如一次消费了100条, offset置为101并且存入Redis
         *   3、每次poll之前,从redis中获取最新的offset位置
         *   4、每次从这个位置开始消费
         *
         * 建议
         *   1.使用redis进行保存
         */
    
        public static void controllerOffset() {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
            properties.setProperty("group.id", "test");
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.commit.interval.ms", "1000");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
    
            // 订阅partition
            consumer.assign(Arrays.asList(p0));
            while (true) {
                // 设置offset
                consumer.seek(p0, 5);
                // 1000毫秒拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                // 操作维度是partition了,每个partition单独处理
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : pRecords) {
    
                        System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                    }
                    long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                    // 手动提交
                    Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                    offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    consumer.commitSync(offset);
                }
            }
        }
    

      

    2.限流

    /**
         * 限流
         */
        public static void controllerLimit() {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
            properties.setProperty("group.id", "test");
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.commit.interval.ms", "1000");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
    
            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
    
            long totalNum = 100;
    
            // 订阅partition
            consumer.assign(Arrays.asList(p0, p1));
            while (true) {
                // 1000毫秒拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                // 操作维度是partition了,每个partition单独处理
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                    long num = 0;
                    for (ConsumerRecord<String, String> record : pRecords) {
                        System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                         /*
                            1、接收到record信息以后,去令牌桶中拿取令牌
                            2、如果获取到令牌,则继续业务处理
                            3、如果获取不到令牌, 则pause等待令牌
                            4、当令牌桶中的令牌足够, 则将consumer置为resume状态
                         */
                        num++;
                        if(record.partition() == 0){
                            if(num >= totalNum){
                                consumer.pause(Arrays.asList(p0));
                            }
                        }
    
                        if(record.partition() == 1){
                            if(num == 40){
                                consumer.resume(Arrays.asList(p0));
                            }
                        }
    
                    }
                    long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                    // 手动提交
                    Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                    offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    consumer.commitSync(offset);
                }
            }
        }
    

      

  • 相关阅读:
    SpringBoot 动态修改定时任务频率
    window三种程序自启动方式
    vbs与bat脚本实现本地jdk版本自动切换
    java连接sqlserver数据库
    java连接Access数据库
    Java如何连接Access数据库(两种方式实例代码)
    java连接access数据库的三种方式以及远程连接
    Linux下实现MySQL数据库每天定时自动备份
    解决谷歌浏览器http链接自动跳转到https的问题
    2021年第一天
  • 原文地址:https://www.cnblogs.com/juncaoit/p/13419991.html
Copyright © 2011-2022 走看看