zoukankan      html  css  js  c++  java
  • 【python】kafka在与celery和gevent连用时遇到的问题

    前提:kafka有同步,多线程,gevent异步和rdkafka异步四种模式。但是在与celery和gevent连用的时候,有的模式会出错。

    下面是我代码运行的结果。

    结论:使用多线程方式!

    使用同步方式可以成功发送数据

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_sync_producer() as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    使用rdkafka异步,只打印了一条send data之后卡住

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_producer(use_rdkafka=True) as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    使用多线程,可以正常生产所有数据

    def send_data_kafka(data):
        try:
            client = KafkaClient(hosts=broker_list)
            topic = client.topics[topic_name]
            with topic.get_producer() as producer:  
                for d in data:
                    print "send data"
                    msg = json.dumps(d)
                    producer.produce(msg)
                producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e

    没有用with,rdkafka异步,打印了所有的send data,后卡住

    client = KafkaClient(hosts=broker_list)
    topic = client.topics[topic_name]
    producer = topic.get_producer(use_rdkafka=True)  # 异步,使用rdkafka库,速度最快的方案
    
    def send_data_kafka(data):
        try:
            for d in data:
                print "send data"
                msg = json.dumps(d)
                producer.produce(msg)
            producer.stop()
        except Exception, e:
            LOGGER.exception("error in send_data_kafka")
            print e
  • 相关阅读:
    (22)C#windows打包部署
    (2)OLEDB数据库操作
    (5)C#工具箱-数据
    (21)C#VS快捷键
    (1)OracleClient数据库操作(淘汰)
    (4)C#工具箱-菜单和工具栏
    (3)C#工具箱-容器
    (2)C#工具箱-公共控件2
    (9)oracle 表的基本查询
    企鹅
  • 原文地址:https://www.cnblogs.com/dplearning/p/7738697.html
Copyright © 2011-2022 走看看