zoukankan      html  css  js  c++  java
  • KafkaConsumer assign VS subscribe

    背景

    在kafka中,正常情况下,同一个group.id下的不同消费者不会消费同样的partition,也即某个partition在任何时刻都只能被具有相同group.id的consumer中的一个消费。 也正是这个机制才能保证kafka的重要特性:

    • 1、可以通过增加partitions和consumer来提升吞吐量;
    • 2、保证同一份消息不会被消费多次。

    在KafkaConsumer类中(官方API),消费者可以通过assign和subscribe两种方式指定要消费的topic-partition。具体的源码可以参考下文,

    这两个接口貌似是完成相同的功能,但是还有细微的差别,初次使用的同学可能感到困惑,下面就详细介绍下两者的区别。

    对比结果

    • KafkaConsumer.subscribe() : 为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。

    • KafkaConsumer.assign() : 为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与指定的group无效(this method does not use the consumer's group management)。

    测试代码

    public class KafkaManualAssignTest {
        private static final Logger logger = LoggerFactory.getLogger(KafkaManualAssignTest.class);
    
        private static Properties props = new Properties();
        private static KafkaConsumer<String, String> c1, c2;
    
        private static final String brokerList = "localhost:9092";
    
        static {
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", "assignTest");
            props.put("auto.offset.reset", "earliest");
            props.put("enable.auto.commit", "true");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            c1 = new KafkaConsumer<String, String>(props);
            c2 = new KafkaConsumer<String, String>(props);
        }
    
        public static void main(String[] args) {
            TopicPartition tp = new TopicPartition("topic", 0);
            // 采用assign方式显示的为consumer指定需要消费的topic, 具有相同group.id的两个消费者
            // 各自消费了一份数据, 出现了数据的重复消费
            c1.assign(Arrays.asList(tp));
            c2.assign(Arrays.asList(tp));
    
    
            // 采用subscribe方式, 利用broker为consumer自动分配topic-partitions,
            // 两个消费者各自消费一个partition, 数据互补, 无交叉.
            // c1.subscribe(Arrays.asList("topic"));
            // c2.subscribe(Arrays.asList("topic"));
    
            while (true) {
                ConsumerRecords<String, String> msg1 = c1.poll(1000L);
                if (msg1 != null) {
                    for (ConsumerRecord m1 : msg1) {
                        logger.info("m1 offset : {} , value : {}", m1.offset(), m1.value());
                    }
                }
    
               logger.info("=====================");
               ConsumerRecords<String, String> msg2 = c2.poll(1000L);
               if (msg2 != null) {
                   for (ConsumerRecord m2 : msg2) {
                       logger.info("m2 offset : {} , value : {}", m2.offset(), m2.value());
                   }
               }
    
               System.exit(0);
            }
        }
    }
    复制代码

    官方api

    官方关于subscribe的解释:

    /**
     * Subscribe to the given list of topics to get dynamically assigned partitions.
     * <b>Topic subscriptions are not incremental. This list will replace the current
     * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
     * with manual partition assignment through {@link #assign(Collection)}.
     *
     * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
     *
     * <p>
     * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which
     * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
     * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
     * to be reset. You should also provide your own listener if you are doing your own offset
     * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
     *
     * @param topics The list of topics to subscribe to
     * @throws IllegalArgumentException If topics is null or contains null or empty elements
     * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
     *                               configured at-least one partition assignment strategy
     */
    @Override
    public void subscribe(Collection<String> topics) {
        subscribe(topics, new NoOpConsumerRebalanceListener());
    }
    复制代码

    官方关于assign的解释:

    /**
     * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
     * and will replace the previous assignment (if there is one).
     * <p>
     * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
     * <p>
     * Manual topic assignment through this method does not use the consumer's group management
     * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
     * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
     * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
     * <p>
     * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
     * assignment replaces the old one.
     *
     * @param partitions The list of partitions to assign this consumer
     * @throws IllegalArgumentException If partitions is null or contains null or empty topics
     * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
     *                               (without a subsequent call to {@link #unsubscribe()})
     */
    @Override
    public void assign(Collection<TopicPartition> partitions) {
        acquireAndEnsureOpen();
        try {
            if (partitions == null) {
                throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
            } else if (partitions.isEmpty()) {
                this.unsubscribe();
            } else {
                Set<String> topics = new HashSet<>();
                for (TopicPartition tp : partitions) {
                    String topic = (tp != null) ? tp.topic() : null;
                    if (topic == null || topic.trim().isEmpty())
                        throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                    topics.add(topic);
                }
    
                // make sure the offsets of topic partitions the consumer is unsubscribing from
                // are committed since there will be no following rebalance
                this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
    
                log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
                this.subscriptions.assignFromUser(new HashSet<>(partitions));
                metadata.setTopics(topics);
            }
        } finally {
            release();
        }
    }
    复制代码

    建议

    建议使用 subscribe() 函数来实现partition的分配。

    除非各位同学清楚了解自己需要消费的topic-partitions(不是topic),而且能确定自己的消息全部在这些topic-partitions中,则可以使用assign。

  • 相关阅读:
    java 新建文本并写入
    批处理 获取相同进程的所有 pid
    io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: error:
    Linux下“/”和“~”的区别
    Error creating bean with name 'consoleConfig'
    2019 蓝桥杯省赛 B 组模拟赛 结果填空:马的管辖
    # Codeforces Round #663 (Div. 2)
    Codeforces Round #645 (Div. 2) A~D
    迷宫2 NC15196
    Codeforces Round #643 (Div. 2)
  • 原文地址:https://www.cnblogs.com/twodog/p/12135483.html
Copyright © 2011-2022 走看看