该博文方法有问题,正确方案在http://www.cnblogs.com/dplearning/p/7992994.html
将指定group对应的offset重置到最大值,跳过未消费数据
代码如下:
# coding:utf-8 import os from confluent_kafka import Consumer, TopicPartition import traceback def reset_kafka_offset(group, topic): broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.x:9092" c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.subscribe([topic]) tp = TopicPartition(topic, 0) tp_out = c.committed([tp]) init_offset = tp_out[0].offset if int(init_offset) == -1001: #是一个新的group 没有消费过 # 如果是一个新的group.id必须先消费一条消息,这样后面的重置offset才有效, 如果不消费,重置offset前后获取到的offset值都是-1001 msg = c.poll() if not msg.error(): msg_data = msg.value().decode('utf-8') c.commit() tp = TopicPartition(topic, 0) watermark_offsets = c.get_watermark_offsets(tp) # 获取offset最大最小值 print watermark_offsets if watermark_offsets: logsize = watermark_offsets[1] # offset最大值 if logsize is not None: tp1 = TopicPartition(topic, 0, int(logsize)) c.commit(offsets=[tp1], async=False) # 直接将offset置为logsize,跳过未消费的数据 tp_out = c.committed([tp]) # 查看提交的offset位置 print tp_out[0].offset c.close() if __name__ == "__main__": reset_kafka_offset("test", "test")