之前写过两篇关于重置offset的博文,后来使用过程中都有问题。
经过各种尝试,终于找到了解决方案。
直接上代码:
# coding=utf8 from confluent_kafka import Consumer, KafkaError, TopicPartition def reset(topic, group, partition, new_offset): broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092" new_offset = int(new_offset) - 1 #从new_offset-1的地方开始消费,这样消费一条后提交就是new_offset位置 tp_c = TopicPartition(topic, partition, new_offset) c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'enable.auto.commit': True, # 把自动提交打开 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.assign([tp_c]) c.poll() def reset_offset(topic, group, partition, new_offset): while True: try: reset(topic, group, partition, new_offset) break except: print "ERROR in reset, repeat." continue if __name__ == "__main__": reset_offset("test1", "test1", 0, 100)
代码中的while循环是防止有时连不上kafka服务器报错。
很奇怪,我直接用c.commit(offsets=[tp_c])不起作用,非要我消费后它自动重置才有效。
附,重置offset到最大值的操作。比上面多出了获取最大值的部分代码。
# coding=utf8 from confluent_kafka import Consumer, KafkaError, TopicPartition def reset(topic, group): broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092" c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'enable.auto.commit': False, 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.subscribe([topic]) # 这一句必须有,否则后面get_watermark_offsets会报错 Failed to get watermark offsets: Broker: Leader not available tp = TopicPartition(topic, 0) committed = c.committed([tp]) # 这一句必须有,否则后面get_watermark_offsets会报错 Failed to get watermark offsets: Broker: Leader not available print "committed: %s" % committed[0].offset watermark_offsets = c.get_watermark_offsets(tp) print "watermark_offsets:%s %s" % (watermark_offsets[0], watermark_offsets[1]) new_offset = int(watermark_offsets[1]) - 1 print new_offset tp_c = TopicPartition(topic, 0, new_offset) c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'enable.auto.commit': True, # 把自动提交打开 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.assign([tp_c]) c.poll() def reset_offset(topic, group): while True: try: reset(topic, group) break except: print "ERROR in reset, repeat." continue if __name__ == "__main__": reset_offset("test1", "test1")
注意: subscribe和assign是不能同时使用的。subscribe表示订阅topic,从kafka记录的offset开始消费。assign表示从指定的offset开始消费。
问题:
1.为何获取watermark_offsets必须要使用subscribe和committed,不使用就报错Failed to get watermark offsets: Broker: Leader not available?
2.c.commit(offsets=[tp]) 在什么情况下有效?