zoukankan      html  css  js  c++  java
  • 【kafka】celery与kafka的联用问题

    背景:一个小应用,用celery下发任务,任务内容为kafka生产一些数据。

    问题:使用confluent_kafka模块时,单独启用kafka可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了。

    解决:换了个pykafka模块,结果问题就没有了。

    我很疑惑啊,是我调用confluent_kafka的方法不对吗,怎么套上celery就不行了呢?

    可以用的pykafka代码:

    tasks.py

    from celery import Celery
    from pykafka import KafkaClient
    import json
    
    
    app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')
    
    @app.task
    def produce():
        client = KafkaClient(hosts="localhost:9092")
        print client.topics
        topic = client.topics['test_result']
        with topic.get_sync_producer() as producer:
            for i in range(3):
                data = {"info": {"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"}
                print('Producing message: %s' % data)
                producer.produce(json.dumps(data))
            print "finish produce"
            producer.stop()
            print "stop"                 

    run_worker.py

    from tasks import produce
    
    for i in range(1000):
        result = produce.delay()
        print result.status

    无法正常生产数据的confluent_kafka代码:

    tasks.py

    from celery import Celery
    from kafka_producer import p
    import json
    
    
    app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')
    
    @app.task
    def produce():
        for i in range(3000):
            data = {"info": {"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"}
            print('Producing message: %s' % data)
            p.produce('test_result3', json.dumps(data))
        print "finish produce"
        p.flush()
        print "finish flush"

    run_worker.py

    from tasks import produce
    result = produce.delay()
    print result.status
    print result.ready()
    print result.get()
    print result.status
  • 相关阅读:
    web应用后台开发的故事
    XML的定义、用途、以及它的发展前景和存在的问题等等
    本学期(大三下学期)学习目标
    企业级应用与互联网应用的区别?
    新能源汽车无线充电管理网站4
    新能源汽车无线充电管理网站3
    新能源汽车无线充电管理网站2
    企业级应用与互联网应用的区别
    javaee 新学期新目标
    团队项目PCP--自我评价
  • 原文地址:https://www.cnblogs.com/dplearning/p/7520211.html
Copyright © 2011-2022 走看看