1、避免使用使用consumer.poll(timeout_ms=2000,max_records=10)。
该方法的作用,在固定时间内获取到固定数量的数据;
缺点:1)偶尔会出现获取不到数据的情况;2)会出现消费到重复数据的情况;3)会导致kafka出现rebalance的情况,进而使得kafka下面的groupid无法获取到偏移量的问题。
原因:该方法是先获取数据后提交偏移量,在提交偏移量前的时间内如果有另外一个消费者消费数据就会导致消费到重复数据;同时会触发rebalance,进而导致消费者获取不到数据、查不到偏移量。(自我总结,如有错误请回复~~~)
2、使用迭代器提取数据:for i in consumer,在使用该方法时要确定只创建一个消费者,就是在程序开始的时候创建一个consumer,然后把consumer传入函数,而不是在函数里面创建;如果在函数里面创建consumer,那么每调用一次函数,就要新建一个消费者,这种操作是非常耗费时间的(自测大概耗费290s才会从新消费者消费到数据),而且这种操作还会对kafka和服务器造成不必要的消耗;
3、关于消费者和分区:每个消费者会先绑定一个分区(partition),然后该消费者就一直消费该分区,直到把改分区的数据消费完或者等待kafka的rebalance,消费完该分区之后就会自动切换其他的分区;如果同时启动多个消费者,那么,每个消费者绑定的分区都是不同的;必须保持分区数>=消费者数,因为如果消费者数>分区数,那么就会有多个消费者同时消费同一个分区,导致消费到相同的数据;如果已经存在消费者了,新增消费者的时候就会非常耗费时间,自测大概250s之后新消费者才会开始消费数据。
from kafka import KafkaConsumer, KafkaProducer import json, time import timeout_decorator @timeout_decorator.timeout(500) def demo(consumer): list_data = [] try: for i in consumer: list_data.append(i) data = i.value.decode("utf-8") print(data, type(json.loads(data))) if len(list_data) > 20: break return list_data except Exception as e: print(e) return list_data def main(): hosts = ['127.0.0.1:9092', '0.0.0.0:9092'] topic = "Coupon_Update_Access_V1" group_id = "qly_test0001" consumer = KafkaConsumer(topic, bootstrap_servers=hosts, group_id=group_id, auto_offset_reset="earliest") while True: result = demo(consumer) print(len(result)) time.sleep(5) if __name__ == '__main__': main()