zoukankan      html  css  js  c++  java
  • 【python】kafka在与celery和gevent连用时遇到的问题

    前提:kafka有同步,多线程,gevent异步和rdkafka异步四种模式。但是在与celery和gevent连用的时候,有的模式会出错。

    下面是我代码运行的结果。

    结论:使用多线程方式!

    使用同步方式可以成功发送数据

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_sync_producer() as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    使用rdkafka异步,只打印了一条send data之后卡住

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_producer(use_rdkafka=True) as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    使用多线程,可以正常生产所有数据

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_producer() as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    没有用with,rdkafka异步,打印了所有的send data,后卡住

    client = KafkaClient(hosts=broker_list)
    topic = client.topics[topic_name]
    producer = topic.get_producer(use_rdkafka=True)  # 异步,使用rdkafka库,速度最快的方案
    
    def send_data_kafka(data):
        try:
            for d in data:
                print "send data"
                msg = json.dumps(d)
                producer.produce(msg)
            producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e
  • 相关阅读:
    学生管理
    数据类型
    Linux安装、发布Django项目
    函数式编程
    学生管理系统
    mysql数据库工具类
    python操作数据库
    链接mysql建库建表
    列表元组字典集合
    内置对象相关方法
  • 原文地址:https://www.cnblogs.com/dplearning/p/7738697.html
Copyright © 2011-2022 走看看