zoukankan      html  css  js  c++  java
  • Kafka学习笔记(五、Kafka偏移量)

    目录:

    • MetaData信息
    • Kafka偏移量
    • 客户端负载均衡

    MetaData信息

    客户端如何知道该往哪个节点发送请求来获取数据:通过元数据。

    元数据(MetaData)是什么:topic、topic的分区、每个分区有哪些副本、哪个副本是leader等信息。

    一般情况下客户端会缓存元数据,并直接往目标broker上发送生产和获取请求,并且客户端还会定时的刷新自己的元数据

    Kafka偏移量

    1、Kafka GUI

    说偏移量之前先介绍下Kafka GUI(Kafka graphical user interface),因官方没有提供,所以我们使用社区较为活跃的工具。

    • Kafka Tool地址: http://www.kafkatool.com/download.html
    • Kafka Manager地址: https://github.com/yahoo/kafka-manager
    • KafkaOffsetMonitor地址: https://github.com/Morningstar/kafka-offset-monitor

    KafkaOffsetMonitor配置:

    KafkaOffsetMonitor启动脚本(如: kafkaoffset_monitor.sh,记得给脚本赋执行权限)

    java -cp KafkaOffsetMonitor-assembly 0.46-SNAPSHOT.jar 
             com.quantifind.kafka.offsetapp.OffsetGetterWeb
    --offsetStorage kafka 
    --kafkaBrokers ip1:port1,ip2:port2,ip3:port3 
    --zk ip1:port1,ip2:port2,ip3:port3 
    --port 8088
    --refresh 10.seconds 
    --retain 2.days
    • --offsetStorage:指明offset信息由kafka来保存,而非zookeeper
    • --refresh:多少秒刷新一次信息
    • --retain:信息保存到数据库多少天

    ——————————————————————————————————————————————————————

    2、消费指定偏移量的消息

    // 指定分区信息
    consumer.assign(Collections.singletonList(new TopicPartition("Topic-02", 0)));
    // 从头开始消费消息
    consumer.seekToBeginning(Collections.singletonList(new TopicPartition("Topic-02", 0)));
    // 按照指定的偏移量消费消息
    consumer.seek(new TopicPartition("Topic-05", 1), 9);

    ——————————————————————————————————————————————————————

    客户端负载均衡

    消费者发生变化(加入新的消费者或原有消费者宕机)或topic发生变化时会出现再均衡现象(分区的所有权从一个消费者转到另一个消费者)。

    再均衡现象会导致消息的重复处理丢失

    • 当提交的偏移量小于客户端处理的偏移量时重复处理消息。
    • 当提交的偏移量大于客户端处理的偏移量时会丢失消息。

    ——————————————————————————————————————————————————————

    为了解决这一问题Kafka提供了再均衡监听器:ConsumerRebalanceListener

    private static class CustomerRebalancer implements ConsumerRebalanceListener {
        /**
         * 再均衡开始之前和消费者停止读取消息之后被调用
         * 如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了
         */
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // 如果发生再均衡,我们要在即将失去分区所有权时提交偏移量
            // 要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量
            System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
            consumer.commitSync(currentOffsets);
        }
    
        /**
         * 在重新分配分区之后和新的消费者开始读取消息之前被调用
         */
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            long committedOffset = -1;
            for (TopicPartition topicPartition : partitions) {
                // 获取该分区已经消费的偏移量
                committedOffset = consumer.committed(topicPartition).offset();
                // 重置偏移量到上一次提交的偏移量的下一个位置处开始消费
                consumer.seek(topicPartition, committedOffset + 1);
            }
        }
    }
  • 相关阅读:
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    分布式架构在农业银行的应用实践与展望
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/12221623.html
Copyright © 2011-2022 走看看