zoukankan      html  css  js  c++  java
  • 【python】confluent_kafka将offset置为最大

     该博文方法有问题,正确方案在http://www.cnblogs.com/dplearning/p/7992994.html

    将指定group对应的offset重置到最大值,跳过未消费数据

    代码如下:

    # coding:utf-8
    
    import os
    from confluent_kafka import Consumer, TopicPartition
    import traceback
    
    
    def reset_kafka_offset(group, topic):
        broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.x:9092"
        c = Consumer({'bootstrap.servers': broker_list,
                      'group.id': group,
                      'default.topic.config': {'auto.offset.reset': 'smallest'}})
        c.subscribe([topic])
    
        tp = TopicPartition(topic, 0)
        tp_out = c.committed([tp])
        init_offset = tp_out[0].offset
        if int(init_offset) == -1001:   #是一个新的group 没有消费过 
            # 如果是一个新的group.id必须先消费一条消息,这样后面的重置offset才有效, 如果不消费,重置offset前后获取到的offset值都是-1001
            msg = c.poll()
            if not msg.error():
                 msg_data = msg.value().decode('utf-8')
            c.commit()
    
        tp = TopicPartition(topic, 0)
        watermark_offsets = c.get_watermark_offsets(tp)   # 获取offset最大最小值
        print watermark_offsets
        if watermark_offsets:
            logsize = watermark_offsets[1]  # offset最大值
            if logsize is not None:
                tp1 = TopicPartition(topic, 0, int(logsize))
                c.commit(offsets=[tp1], async=False)  # 直接将offset置为logsize,跳过未消费的数据
            tp_out = c.committed([tp])  # 查看提交的offset位置
            print tp_out[0].offset
            c.close()
    
    
    if __name__ == "__main__":
        reset_kafka_offset("test", "test")
  • 相关阅读:
    持续集成环境搭建(一)
    The POSIX API/nss/nscd
    [postfix]转发邮件设置
    [python]python安装包错误
    栈及栈帧讲解
    kernel-init-bash
    nginx no input file specified
    Lumen/Laravel调试API接口利器laravel-debugbar
    homestead实现外部局域网络其他主机的访问
    PHP rabbitmq扩展安装
  • 原文地址:https://www.cnblogs.com/dplearning/p/7911895.html
Copyright © 2011-2022 走看看