zoukankan      html  css  js  c++  java
  • kafka 消费者offset记录位置和方式

    我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同。

    首先来说说消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector,我们会配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:

    [zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
    5662
    cZxid = 0x20006d28a
    ctime = Wed Apr 12 18:20:51 CST 2017
    mZxid = 0x30132b0ed
    mtime = Tue Aug 22 18:53:22 CST 2017
    pZxid = 0x20006d28a
    cversion = 0
    dataVersion = 5758
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 4
    numChildren = 0

    如果是根据kafka默认的api来消费,即【org.apache.kafka.clients.consumer.KafkaConsumer】,我们会配置参数【bootstrap.servers】来消费。而其消费者的offset会更新到一个kafka自带的topic【__consumer_offsets】下面,查看当前group的消费进度,则要依靠kafka自带的工具【kafka-consumer-offset-checker】,例如:

    [root@localhost data]# kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group test-consumer-group  --topic stable-test
    [2017-08-22 19:24:24,222] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
    Group           Topic                          Pid Offset          logSize         Lag             Owner
    test-consumer-group stable-test                    0   601808          601808          0               none
    test-consumer-group stable-test                    1   602826          602828          2               none
    test-consumer-group stable-test                    2   602136          602136          0               none

    上面结果的说明:

    • Group : 消费者组
    • Topic : topic的名字
    • Pid : partition的ID
    • Offset : kafka消费者在对应分区上已经消费的消息数【位置】
    • logSize : 已经写到该分区的消息数【位置】
    • Lag : 还有多少消息未读取(Lag = logSize - Offset)
    • Owner : 分区创建在哪个broker

    offset更新的方式,不区分是用的哪种api,大致分为两类:

    1. 自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。
    2. 手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

     

  • 相关阅读:
    SqlServer与Access之间的数据互导
    [转]半角<=>全角互转函数[JS版 VBS版]
    [文摘20070914]一个成功的博客必须知道的80个博客工具
    在javascript中获得由Ajax返回DataTable的列数和列名
    [转]获取xml节点值和属性值(兼容ie和firefox)
    [文摘20070920]网络战
    游标简单使用
    sql函数 得到 由 年月日时分秒+三位内的随机数 组成的随机数
    [转]ASP.NET 2.0 AJAX中Webservice调用方法示例 (包含参数类型为DataTable的WS方法)
    向页面中添加音乐或flash
  • 原文地址:https://www.cnblogs.com/yucy/p/7413394.html
Copyright © 2011-2022 走看看