zoukankan      html  css  js  c++  java
  • kafka消费者使用心得

    1、避免使用使用consumer.poll(timeout_ms=2000,max_records=10)。

    该方法的作用,在固定时间内获取到固定数量的数据;

    缺点:1)偶尔会出现获取不到数据的情况;2)会出现消费到重复数据的情况;3)会导致kafka出现rebalance的情况,进而使得kafka下面的groupid无法获取到偏移量的问题。

    原因:该方法是先获取数据后提交偏移量,在提交偏移量前的时间内如果有另外一个消费者消费数据就会导致消费到重复数据;同时会触发rebalance,进而导致消费者获取不到数据、查不到偏移量。(自我总结,如有错误请回复~~~)

    2、使用迭代器提取数据:for i in consumer,在使用该方法时要确定只创建一个消费者,就是在程序开始的时候创建一个consumer,然后把consumer传入函数,而不是在函数里面创建;如果在函数里面创建consumer,那么每调用一次函数,就要新建一个消费者,这种操作是非常耗费时间的(自测大概耗费290s才会从新消费者消费到数据),而且这种操作还会对kafka和服务器造成不必要的消耗;

    3、关于消费者和分区:每个消费者会先绑定一个分区(partition),然后该消费者就一直消费该分区,直到把改分区的数据消费完或者等待kafka的rebalance,消费完该分区之后就会自动切换其他的分区;如果同时启动多个消费者,那么,每个消费者绑定的分区都是不同的;必须保持分区数>=消费者数,因为如果消费者数>分区数,那么就会有多个消费者同时消费同一个分区,导致消费到相同的数据;如果已经存在消费者了,新增消费者的时候就会非常耗费时间,自测大概250s之后新消费者才会开始消费数据。

    from kafka import KafkaConsumer, KafkaProducer
    import json, time
    import timeout_decorator
    
    
    @timeout_decorator.timeout(500)
    def demo(consumer):
        list_data = []
        try:
            for i in consumer:
                list_data.append(i)
                data = i.value.decode("utf-8")
                print(data, type(json.loads(data)))
                if len(list_data) > 20:
                    break
            return list_data
        except Exception as e:
            print(e)
            return list_data
    
    
    def main():
        hosts = ['127.0.0.1:9092', '0.0.0.0:9092']
        topic = "Coupon_Update_Access_V1"
        group_id = "qly_test0001"
        consumer = KafkaConsumer(topic, bootstrap_servers=hosts, group_id=group_id, auto_offset_reset="earliest")
        while True:
            result = demo(consumer)
            print(len(result))
            time.sleep(5)
    
    
    if __name__ == '__main__':
        main()
    

      

  • 相关阅读:
    《Go并发编程实战》读书笔记-初识Go语言
    使用Nexus配置Maven私有仓库
    Maven 本地资源库配置
    Django 2.2.x版本的ORM API实战案例
    在Mac OS环境下安装MySQL服务
    Pycharm搭建Django开发环境
    Hadoop生态圈-单点登录框架之CAS(Central Authentication Service)部署
    Ambari集成Kerberos报错汇总
    Hadoop生态圈-开启Ambari的Kerberos安全选项
    Hortonworks官网文档怎么找?
  • 原文地址:https://www.cnblogs.com/qiaoer1993/p/14277722.html
Copyright © 2011-2022 走看看