zoukankan      html  css  js  c++  java
  • Kafka中数据的流向

    1: 多个消费者消费同一个Topic数据相同的数据

    2: 多个消费者消费同一个Topic数据不同数据

    3: 各个消费者按组协调消费

    1: 多个消费者消费同一个Topic数据相同的数据

    (1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
    
    (2)使用assign来订阅;
    # 例如 groupId 
    @KafkaListener(topics = "test-syn",groupId = "test-2")
    public void send(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object messge = kafkaMessage.get();
            log.info("【KafkaListener监听到消息】" + messge);
        }
    }

    注意:如果把 "enable.auto.commit" 设为 "false",使用 consumer.commitAsync(currentOffsets, null) 手动提交 offset ,是不能从头开始消费的

    auto.offset.reset值含义解释:

        • earliest
            • 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        • latest
            • 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        • none
            • topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    clipboard

    也就是说无论哪种设置,只要 kafka 中相同 group、partition 中已经有提交的 offset,则都无法从开始消费。

    参考论坛:服务器重启了,那么该group是否会重新消费服务器里面所有的消息

    KafkaConsumer.subscribe() : 为consumer自动分配partition,

    有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。如果有多个partition且只有一个消费者,则按顺序消费所有分区。不会重复消费。

    KafkaConsumer.assign() : 为consumer手动、显示的指定需要消费的topic-partitions,

    不受group.id限制,不提交offset,相当与指定的group无效(this method does not use the consumer's group management)。可以重复消费。

    或者,这样做:

    clipboard

    目前就 high level API 而言,offset 是存于 Zookeeper 中的,无法存于 HDFS,而 low level API 的 offset 是由自己去维护的,可以将之存于 HDFS 中。

    2: 多个消费者消费同一个Topic数据不同数据

    # groupId 将多个消费者分配到同一个组下面
    @KafkaListener(topics = "test-syn",groupId = "test-1")
    public void send(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object messge = kafkaMessage.get();
            log.info("【KafkaListener监听到消息】" + messge);
        }
    }
    @KafkaListener(topics = "test-syn",groupId = "test-1")
    public void send(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object messge = kafkaMessage.get();
            log.info("【KafkaListener监听到消息】" + messge);
        }
    }

    3: 各个消费者按组协调消费

    @KafkaListener(topics = "test-syn",groupId = "test-1")
    public void send(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object messge = kafkaMessage.get();
            log.info("【KafkaListener监听到消息】1" + messge);
        }
    }
    
    @KafkaListener(topics = "test-syn",groupId = "test-2")
    public void send2(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object messge = kafkaMessage.get();
            log.info("【KafkaListener监听到消息】2" + messge);
        }
    }
    @KafkaListener(topics = "test-syn",groupId = "test-3")
    public void send(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object messge = kafkaMessage.get();
            log.info("【KafkaListener监听到消息】1" + messge);
        }
    }
    
    @KafkaListener(topics = "test-syn",groupId = "test-2")
    public void send2(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object messge = kafkaMessage.get();
            log.info("【KafkaListener监听到消息】2" + messge);
        }
    }
    # 上面
    1 2 3 收到相同的消费message
    2 2 收到不同的message
  • 相关阅读:
    支付宝沙箱环境应用
    七牛云视频托管
    腾讯云短息验证码接口
    git远程连接(码云)
    git
    字间距
    html文本保留空格
    mysql重启导致AUTO_INCREMENT从1开始
    js保留两位小数
    vue中watch的基本用法
  • 原文地址:https://www.cnblogs.com/dgwblog/p/12347825.html
Copyright © 2011-2022 走看看