zoukankan      html  css  js  c++  java
  • 【kafka】confluent_kafka重置offset

    之前写过两篇关于重置offset的博文,后来使用过程中都有问题。

    经过各种尝试,终于找到了解决方案。

    直接上代码:

    # coding=utf8
    
    from confluent_kafka import Consumer, KafkaError, TopicPartition
    
    
    def reset(topic, group, partition, new_offset):
        broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
        new_offset = int(new_offset) - 1  #从new_offset-1的地方开始消费,这样消费一条后提交就是new_offset位置
        tp_c = TopicPartition(topic, partition, new_offset)
        c = Consumer({'bootstrap.servers': broker_list,
                      'group.id': group,
                      'enable.auto.commit': True,  # 把自动提交打开
                      'default.topic.config': {'auto.offset.reset': 'smallest'}})
        c.assign([tp_c])
        c.poll()
    
    
    def reset_offset(topic, group, partition, new_offset):
        while True:
            try:
                reset(topic, group, partition, new_offset)
                break
            except:
                print "ERROR in reset, repeat."
                continue
    
    
    if __name__ == "__main__":
        reset_offset("test1", "test1", 0, 100)

    代码中的while循环是防止有时连不上kafka服务器报错。

    很奇怪,我直接用c.commit(offsets=[tp_c])不起作用,非要我消费后它自动重置才有效。

    附,重置offset到最大值的操作。比上面多出了获取最大值的部分代码。

    # coding=utf8
    
    from confluent_kafka import Consumer, KafkaError, TopicPartition
    
    
    def reset(topic, group):
        broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
        c = Consumer({'bootstrap.servers': broker_list,
                      'group.id': group,
                      'enable.auto.commit': False,
                      'default.topic.config': {'auto.offset.reset': 'smallest'}})
        c.subscribe([topic])  # 这一句必须有,否则后面get_watermark_offsets会报错 Failed to get watermark offsets: Broker: Leader not available
        tp = TopicPartition(topic, 0)
        committed = c.committed([tp]) # 这一句必须有,否则后面get_watermark_offsets会报错 Failed to get watermark offsets: Broker: Leader not available
        print "committed: %s" % committed[0].offset
        watermark_offsets = c.get_watermark_offsets(tp)
        print "watermark_offsets:%s %s" % (watermark_offsets[0], watermark_offsets[1])
        new_offset = int(watermark_offsets[1]) - 1
        print new_offset
        tp_c = TopicPartition(topic, 0, new_offset)
    
        c = Consumer({'bootstrap.servers': broker_list,
                      'group.id': group,
                      'enable.auto.commit': True,  # 把自动提交打开
                      'default.topic.config': {'auto.offset.reset': 'smallest'}})
        c.assign([tp_c])
        c.poll()
    
    
    def reset_offset(topic, group):
        while True:
            try:
                reset(topic, group)
                break
            except:
                print "ERROR in reset, repeat."
                continue
    
    
    if __name__ == "__main__":
        reset_offset("test1", "test1")

    注意: subscribe和assign是不能同时使用的。subscribe表示订阅topic,从kafka记录的offset开始消费。assign表示从指定的offset开始消费。

    问题:

    1.为何获取watermark_offsets必须要使用subscribe和committed,不使用就报错Failed to get watermark offsets: Broker: Leader not available?

    2.c.commit(offsets=[tp]) 在什么情况下有效?

  • 相关阅读:
    Sql Server 存储过程删除一个表里(除ID外)完全重复的数据记录
    把一个库中的表复制到另外一个库的表中(Sql server 2005)
    ajax执行后台返回的提交表单及JS
    WinCE中使用本地数据库SQLite以及得到当前应用程序所在路径
    如何评测一个软件工程师的计算机网络知识水平与网络编程技能水平
    如何评测软件工程知识技能水平?
    深入理解TCP协议及其源代码
    Socket与系统调用深度分析
    创新产品的需求分析:未来的图书会是什么样子?
    ubuntu小问题集合
  • 原文地址:https://www.cnblogs.com/dplearning/p/7992994.html
Copyright © 2011-2022 走看看