zoukankan      html  css  js  c++  java
  • 【python】kafka在与celery和gevent连用时遇到的问题

    前提:kafka有同步,多线程,gevent异步和rdkafka异步四种模式。但是在与celery和gevent连用的时候,有的模式会出错。

    下面是我代码运行的结果。

    结论:使用多线程方式!

    使用同步方式可以成功发送数据

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_sync_producer() as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    使用rdkafka异步,只打印了一条send data之后卡住

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_producer(use_rdkafka=True) as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    使用多线程,可以正常生产所有数据

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_producer() as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    没有用with,rdkafka异步,打印了所有的send data,后卡住

    client = KafkaClient(hosts=broker_list)
    topic = client.topics[topic_name]
    producer = topic.get_producer(use_rdkafka=True)  # 异步,使用rdkafka库,速度最快的方案
    
    def send_data_kafka(data):
        try:
            for d in data:
                print "send data"
                msg = json.dumps(d)
                producer.produce(msg)
            producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e
  • 相关阅读:
    【Java基础】多态
    inner join / left join / right join
    Java并发之AQS详解
    AQS实现公平锁和非公平锁
    进程与线程区别是什么
    【java设计模式】代理模式
    Spring中用到的设计模式
    【Java设计模式】工厂模式
    前端开发 —— 本地化
    前端开发 —— Blade 模板引擎
  • 原文地址:https://www.cnblogs.com/dplearning/p/7738697.html
Copyright © 2011-2022 走看看