zoukankan      html  css  js  c++  java
  • 【原创】Kafka Consumer多线程实例续篇

      在上一篇《Kafka Consumer多线程实例》中我们讨论了KafkaConsumer多线程的两种写法:多KafkaConsumer多线程以及单KafkaConsumer多线程。在第二种用法中我使用的是自动提交的方式,省去了多线程提交位移的麻烦。很多人跑来问如果是手动提交应该怎么写?由于KafkaConsumer不是线程安全的,因此我们不能简单地在多个线程中直接调用consumer.commitSync来提交位移。本文将给出一个实际的例子来模拟多线程消费以及手动提交位移。

      本例中包含3个类:

    • ConsumerThreadHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行
    • ConsumerWorker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler
    • Main类:测试主方法类

    测试代码

    ConsumerWorker类

    package huxi.test.consumer.multithreaded;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.List;
    import java.util.Map;
    
    public class ConsumerWorker<K, V> implements Runnable {
    
        private final ConsumerRecords<K, V> records;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
    
        public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.records = record;
            this.offsets = offsets;
        }
    
        @Override
        public void run() {
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
                for (ConsumerRecord<K, V> record : partitionRecords) {
                    // 插入消息处理逻辑,本例只是打印消息
                    System.out.println(String.format("topic=%s, partition=%d, offset=%d",
                            record.topic(), record.partition(), record.offset()));
                }
    
                // 上报位移信息
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                synchronized (offsets) {
                    if (!offsets.containsKey(partition)) {
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    } else {
                        long curr = offsets.get(partition).offset();
                        if (curr <= lastOffset + 1) {
                            offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                        }
                    }
                }
            }
        }
    }

    ConsumerThreadHandler类

    package huxi.test.consumer.multithreaded;
    
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.WakeupException;
    
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ConsumerThreadHandler<K, V> {
    
        private final KafkaConsumer<K, V> consumer;
        private ExecutorService executors;
        private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    
        public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "false");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    consumer.commitSync(offsets);
                }
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    offsets.clear();
                }
            });
        }
    
        /**
         * 消费主方法
         * @param threadNumber  线程池中线程数
         */
        public void consume(int threadNumber) {
            executors = new ThreadPoolExecutor(
                    threadNumber,
                    threadNumber,
                    0L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy());
            try {
                while (true) {
                    ConsumerRecords<K, V> records = consumer.poll(1000L);
                    if (!records.isEmpty()) {
                        executors.submit(new ConsumerWorker<>(records, offsets));
                    }
                    commitOffsets();
                }
            } catch (WakeupException e) {
                // swallow this exception
            } finally {
                commitOffsets();
                consumer.close();
            }
        }
    
        private void commitOffsets() {
            // 尽量降低synchronized块对offsets锁定的时间
            Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
            synchronized (offsets) {
                if (offsets.isEmpty()) {
                    return;
                }
                unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
                offsets.clear();
            }
            consumer.commitSync(unmodfiedMap);
        }
    
        public void close() {
            consumer.wakeup();
            executors.shutdown();
        }
    }

    Main类

    package huxi.test.consumer.multithreaded;
    
    public class Main {
    
        public static void main(String[] args) {
            String brokerList = "localhost:9092";
            String topic = "test-topic";
            String groupID = "test-group";
            final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);
            final int cpuCount = Runtime.getRuntime().availableProcessors();
    
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    handler.consume(cpuCount);
                }
            };
            new Thread(runnable).start();
    
            try {
                // 20秒后自动停止该测试程序
                Thread.sleep(20000L);
            } catch (InterruptedException e) {
                // swallow this exception
            }
            System.out.println("Starting to close the consumer...");
            handler.close();
        }
    }  

    测试步骤

    1. 首先创建一个测试topic: test-topic,10个分区,并使用kafka-producer-perf-test.sh脚本生产50万条消息

    2. 运行Main,假定group.id设置为test-group

    3. 新开一个终端,不断地运行以下脚本监控consumer group的消费进度

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

    测试结果

    LAG列全部为0表示consumer group的位移提交正常。值得一提的是,各位可以通过控制consumer.poll的超时时间来控制ConsumerThreadHandler类提交位移的频率。

    感谢QQ群友的提醒,这种方式有丢失数据的时间窗口——假设T1线程在t0时间消费分区0的位移=100的消息M1,而T2线程在t1时间消费分区0的位移=101的消息M2。现在假设t3时T2线程先完成处理,于是上报位移101给Handler,但此时T1线程尚未处理完成。t4时handler提交位移101,之后T1线程发生错误,抛出异常导致位移100的消息消费失败,但由于位移已经提交到101,故消息丢失~。

  • 相关阅读:
    asp.net与javascript问题
    动态加载用户控件
    ASP.NET中实现模版的动态加载
    一个简单的购物车
    给图片加上水印效果
    用存储过程自定义分页
    上传图片及显示图片
    sql server图片的保存和读取
    Legato Single Server SertupFor RMAN
    确定裸设备上控制文件的大小
  • 原文地址:https://www.cnblogs.com/huxi2b/p/7089854.html
Copyright © 2011-2022 走看看