zoukankan      html  css  js  c++  java
  • 【原创】Kafka Consumer多线程消费

    上一篇《Kafka Consumer多线程实例续篇》修正了多线程提交位移的问题,但依然可能出现数据丢失的情况,原因在于多个线程可能拿到相同分区的数据,而消费的顺序会破坏消息本身在分区中的顺序,因而扰乱位移的提交。这次我使用KafkaConsumer的pause和resume方法来防止这种情形的发生。另外,本次我会编写一个测试类用于验证消费相同数量消息时,单线程消费速度要远逊于多线程消费。

    概述 

    这一次,我编写了5个java文件,它们分别是:

    • OrdinaryConsumer.java:普通的单线程Consumer,用于后面进行性能测试对比用。
    • ConsumerWorker.java:多线程消息处理类,本质上就是一个Runnable。会被提交给线程池用于实际消息处理。
    • MultiThreadedConsumer.java:多线程Consumer主控类,用于将消息分配给不同的ConsumerWorker,并且管理位移的提交。
    • MultiThreadedRebalanceListener.java:为多线程Consumer服务的Rebalance监听器。
    • Test.java:用于测试单线程和多线程性能。

    OrdinaryConsumer类

    单线程的Consumer最简单,我首先给出它的代码:

    package huxihx.mtc;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ThreadLocalRandom;
    
    /**
     * 单线程Consumer
     */
    public class OrdinaryConsumer {
    
        private final Consumer<String, String> consumer;
        private final int expectedCount; // 用于测试的消息数量
    
        public OrdinaryConsumer(String brokerId, String topic, String groupID, int expectedCount) {
            Properties props = new Properties();
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(topic));
            this.expectedCount = expectedCount;
        }
    
        public void run() {
            try {
                int alreadyConsumed = 0;
                while (alreadyConsumed < expectedCount) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    alreadyConsumed += records.count();
                    records.forEach(this::handleRecord);
                }
            } finally {
                consumer.close();
            }
        }
    
        private void handleRecord(ConsumerRecord<String, String> record) {
            try {
                // 模拟每条消息10毫秒处理
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
            } catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
            System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset());
        }
    } 

     代码很简单,没什么可说的。唯一要说的是Consumer会模拟10毫秒处理一条事件。后面多线程Consumer我们也会使用相同的标准。

    ConsumerWorker.java

    接下来是消息处理的Runnable类:ConsumerWorker。和上一篇相比,这次最大的不同在于每个Worker只处理相同分区下的消息,而不是向之前那样处理多个分区中的消息。这样做的好处在于一旦某个分区的消息分配给了这个Worker,我可以暂停这个分区的可消费状态,直到这个Worker全部处理完成。如果是混着多个分区的消息一起处理,实现这个就比较困难。ConsumerWorker代码如下:

    package huxihx.mtc;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConsumerWorker<K, V> {
    
        private final List<ConsumerRecord<K, V>> recordsOfSamePartition;
        private volatile boolean started = false;
        private volatile boolean stopped = false;
        private final ReentrantLock lock = new ReentrantLock();
    
        private final long INVALID_COMMITTED_OFFSET = -1L;
        private final AtomicLong latestProcessedOffset = new AtomicLong(INVALID_COMMITTED_OFFSET);
        private final CompletableFuture<Long> future = new CompletableFuture<>();
    
        public ConsumerWorker(List<ConsumerRecord<K, V>> recordsOfSamePartition) {
            this.recordsOfSamePartition = recordsOfSamePartition;
        }
    
        public boolean run() {
            lock.lock();
            if (stopped)
                return false;
            started = true;
            lock.unlock();
            for (ConsumerRecord<K, V> record : recordsOfSamePartition) {
                if (stopped)
                    break;
                handleRecord(record);
                if (latestProcessedOffset.get() < record.offset() + 1)
                    latestProcessedOffset.set(record.offset() + 1);
            }
            return future.complete(latestProcessedOffset.get());
        }
    
        public long getLatestProcessedOffset() {
            return latestProcessedOffset.get();
        }
    
        private void handleRecord(ConsumerRecord<K, V> record) {
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
            } catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
            System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset());
        }
    
        public void close() {
            lock.lock();
            this.stopped = true;
            if (!started) {
                future.complete(latestProcessedOffset.get());
            }
            lock.unlock();
        }
    
        public boolean isFinished() {
            return future.isDone();
        }
    
        public long waitForCompletion(long timeout, TimeUnit timeUnit) {
            try {
                return future.get(timeout, timeUnit);
            } catch (Exception e) {
                if (e instanceof InterruptedException)
                    Thread.currentThread().interrupt();
                return INVALID_COMMITTED_OFFSET;
            }
        }
    }
    

    需要说明的地方有以下几点:

    • latestProcessedOffset:使用这个变量保存该Worker当前已消费的最新位移。
    • future:使用CompletableFuture来保存Worker要提交的位移。
    • Worker成功操作与否的标志就是看这个future是否将latestProcessedOffset值封装到结果中。
    • handleRecord和单线程Consumer中的一致,模拟10ms处理消息。

    MultiThreadedConsumer.java

    构建好了ConsumerWorker类之后,下面是编写多线程Consumer的主控类,该类循环执行:1、创建Consumer;2、读取订阅分区的消息;3、将消息按照不同分区进行归组分发给不同的线程;4、暂停这些分区的后续消费,同时等待Worker线程完成消息处理;5、提交这些分区的位移;6、恢复这些分区的消费。

    以下代码是MultiThreadedConsumer类的完整代码:

    package huxihx.mtc;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class MultiThreadedConsumer {
    
        private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers = new HashMap<>();
        private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        private long lastCommitTime = System.currentTimeMillis();
        private final Consumer<String, String> consumer;
        private final int DEFAULT_COMMIT_INTERVAL = 3000;
        private final Map<TopicPartition, Long> currentConsumedOffsets = new HashMap<>();
        private final long expectedCount;
    
        private final static Executor executor = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors() * 10, r -> {
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    return t;
                });
    
        public MultiThreadedConsumer(String brokerId, String topic, String groupID, long expectedCount) {
            Properties props = new Properties();
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(topic), new MultiThreadedRebalanceListener(consumer, outstandingWorkers, offsetsToCommit));
            this.expectedCount = expectedCount;
        }
    
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    distributeRecords(records);
                    checkOutstandingWorkers();
                    commitOffsets();
                    if (currentConsumedOffsets.values().stream().mapToLong(Long::longValue).sum() >= expectedCount) {
                        break;
                    }
                }
            } finally {
                consumer.close();
            }
        }
    
        /**
         * 对已完成消息处理并提交位移的分区执行resume操作
         */
        private void checkOutstandingWorkers() {
            Set<TopicPartition> completedPartitions = new HashSet<>();
            outstandingWorkers.forEach((tp, worker) -> {
                if (worker.isFinished()) {
                    completedPartitions.add(tp);
                }
                long offset = worker.getLatestProcessedOffset();
                currentConsumedOffsets.put(tp, offset);
                if (offset > 0L) {
                    offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
                }
            });
            completedPartitions.forEach(outstandingWorkers::remove);
            consumer.resume(completedPartitions);
        }
    
        /**
         * 提交位移
         */
        private void commitOffsets() {
            try {
                long currentTime = System.currentTimeMillis();
                if (currentTime - lastCommitTime > DEFAULT_COMMIT_INTERVAL && !offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit);
                    offsetsToCommit.clear();
                }
                lastCommitTime = currentTime;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 将不同分区的消息交由不同的线程,同时暂停该分区消息消费
         * @param records
         */
        private void distributeRecords(ConsumerRecords<String, String> records) {
            if (records.isEmpty())
                return;
            Set<TopicPartition> pausedPartitions = new HashSet<>();
            records.partitions().forEach(tp -> {
                List<ConsumerRecord<String, String>> partitionedRecords = records.records(tp);
                pausedPartitions.add(tp);
                final ConsumerWorker<String, String> worker = new ConsumerWorker<>(partitionedRecords);
                CompletableFuture.supplyAsync(worker::run, executor);
                outstandingWorkers.put(tp, worker);
            });
            consumer.pause(pausedPartitions);
        }
    }  

     该类代码需要说明的地方包括:

    • executor:我创建了一个包含10倍CPU核数的线程数。具体线程数根据你自己的业务需求而定。如果你的事件处理逻辑是I/O密集型操作(比如写入外部系统),那么设置一个大一点的线程数通常都是有意义的。当然,我个人觉得最好不要超过Consumer分配到的总分区数。
    • 一定要将自动提交位移的参数设置为false。多线程Consumer的一个关键设计就是要手动提交位移。
    • Rebalance监听器设置为MultiThreadedRebalanceListener。这个类如何响应分区的回收与分配我们稍后讨论。
    • run方法的逻辑基本上遵循了上面提到的流程:消息获取 -> 分发 -> 检查消费进度 -> 提交位移
    • expectedCount:这是为了后面进行性能测试比对用到的总消息消费数。

    MultiThreadedRebalanceListener.java

    多线程Consumer在Rebalance操作开启后要小心处理。首先,主线程的poll方法与Worker线程处理消息是并行执行的。此时如果发生Rebalance,那么有些分区就会被分配给其他Consumer,但Worker线程依然可能正在处理这些分区。因此,就可能出现这样的场景:两个Consumer都会处理这些分区中的消息。这就破坏了消费者组的设计理念。针对这种情况,我们必须要确保要被回收的那些分区的处理必须首先完成,之后才能被重新分配。

    总体而言,在要回收分区前,多线程Consumer必须完成:

    1. 停止对应的Worker线程
    2. 提交位移

    当然,一旦分区被重新分配后,事情就变得简单了,我们调用resume恢复这些分区的可消费状态即可。如果这些分区之前就是可以消费的,那么调用resume方法就没有任何效果,总之是一个“无害”操作。MultiThreadedRebalanceListener类完整代码如下:

    package huxihx.mtc;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    public class MultiThreadedRebalanceListener implements ConsumerRebalanceListener {
    
        private final Consumer<String, String> consumer;
        private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
    
        public MultiThreadedRebalanceListener(Consumer<String, String> consumer,
                                              Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers,
                                              Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.consumer = consumer;
            this.outstandingWorkers = outstandingWorkers;
            this.offsets = offsets;
        }
    
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            Map<TopicPartition, ConsumerWorker<String, String>> stoppedWorkers = new HashMap<>();
            for (TopicPartition tp : partitions) {
                ConsumerWorker<String, String> worker = outstandingWorkers.remove(tp);
                if (worker != null) {
                    worker.close();
                    stoppedWorkers.put(tp, worker);
                }
            }
    
            stoppedWorkers.forEach((tp, worker) -> {
                long offset = worker.waitForCompletion(1, TimeUnit.SECONDS);
                if (offset > 0L) {
                    offsets.put(tp, new OffsetAndMetadata(offset));
                }
            });
    
            Map<TopicPartition, OffsetAndMetadata> revokedOffsets = new HashMap<>();
            partitions.forEach(tp -> {
                OffsetAndMetadata offset = offsets.remove(tp);
                if (offset != null) {
                    revokedOffsets.put(tp, offset);
                }
            });
    
            try {
                consumer.commitSync(revokedOffsets);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            consumer.resume(partitions);
        }
    }
    

    该类代码需要说明的地方包括:

    • 任何Rebalance监听器都要实现ConsumerRebalanceListener接口。
    • 该类定义了3个字段,分别保存Consumer实例、要停掉的Worker线程实例以及要提交的位移数据。
    • 主要的逻辑在onPartitionsRevoked方法中实现。第一步是停掉Worker线程;第二步是手动提交位移。

    Test.java

    说完了以上4个Java类之后,现在我们编写一个测试类来比较单线程Consumer和多线程Consumer的性能对比。首先我们创建一个topic,50个分区,单副本,并使用kafka-producer-perf-test工具创建5万条消息,每个分区1000条。之后编写如下代码分别测试两个Consumer的消费耗时:

    package huxihx.mtc;
    
    public class Test {
        public static void main(String[] args) throws InterruptedException {
            int expectedCount = 50 * 900;
            String brokerId = "localhost:9092";
            String groupId = "test-group";
            String topic = "test";
    
            OrdinaryConsumer consumer = new OrdinaryConsumer(brokerId, topic, groupId + "-single", expectedCount);
            long start = System.currentTimeMillis();
            consumer.run();
            System.out.println("Single-threaded consumer costs " + (System.currentTimeMillis() - start));
    
            Thread.sleep(1L);
    
            MultiThreadedConsumer multiThreadedConsumer =
                    new MultiThreadedConsumer(brokerId, topic, groupId + "-multi", expectedCount);
            start = System.currentTimeMillis();
            multiThreadedConsumer.run();
            System.out.println("Multi-threaded consumer costs " + (System.currentTimeMillis() - start));
        }
    }
    

    最后结果显示。单线程Consumer消费45000条消息共耗时232秒,而多线程Consumer耗时6.2秒,如下:

    Single-threaded consumer costs 232336

    Multi-threaded consumer costs 6246

    显然,采用多线程Consumer的消费性能大约是单线程Consumer的37倍。当然实际的提升效果依具体环境而定。不过结论是肯定的,多线程Consumer在CPU核数很多且消息处理逻辑为I/O密集型操作的情形下会比单线程Consumer表现更好。

  • 相关阅读:
    LeetCode 453 Minimum Moves to Equal Array Elements
    LeetCode 112 Path Sum
    LeetCode 437 Path Sum III
    LeetCode 263 Ugly Number
    Solutions and Summay for Linked List Naive and Easy Questions
    AWS–Sysops notes
    Linked List
    All About Linked List
    datatable fix error–Invalid JSON response
    [转]反编译c#的相关问题
  • 原文地址:https://www.cnblogs.com/huxi2b/p/13668061.html
Copyright © 2011-2022 走看看