zoukankan      html  css  js  c++  java
  • 【kafka】celery与kafka的联用问题

    背景:一个小应用,用celery下发任务,任务内容为kafka生产一些数据。

    问题:使用confluent_kafka模块时,单独启用kafka可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了。

    解决:换了个pykafka模块,结果问题就没有了。

    我很疑惑啊,是我调用confluent_kafka的方法不对吗,怎么套上celery就不行了呢?

    可以用的pykafka代码:

    tasks.py

    from celery import Celery
    from pykafka import KafkaClient
    import json
    
    
    app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')
    
    @app.task
    def produce():
        client = KafkaClient(hosts="localhost:9092")
        print client.topics
        topic = client.topics['test_result']
        with topic.get_sync_producer() as producer:
            for i in range(3):
                data = {"info": {"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"}
                print('Producing message: %s' % data)
                producer.produce(json.dumps(data))
            print "finish produce"
            producer.stop()
            print "stop"                 

    run_worker.py

    from tasks import produce
    
    for i in range(1000):
        result = produce.delay()
        print result.status

    无法正常生产数据的confluent_kafka代码:

    tasks.py

    from celery import Celery
    from kafka_producer import p
    import json
    
    
    app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')
    
    @app.task
    def produce():
        for i in range(3000):
            data = {"info": {"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"}
            print('Producing message: %s' % data)
            p.produce('test_result3', json.dumps(data))
        print "finish produce"
        p.flush()
        print "finish flush"

    run_worker.py

    from tasks import produce
    result = produce.delay()
    print result.status
    print result.ready()
    print result.get()
    print result.status
  • 相关阅读:
    如何实现EndNote中的PDF批量导出
    UltraEdit 编译输出中文乱码的解决办法
    史密斯(smith)圆图讲解
    OpenFlow
    网络虚拟化-简介
    java util包概述
    内存四区分析
    理解Java接口
    Ubuntu14.04安装wineqq国际版
    使用注解来构造IoC容器
  • 原文地址:https://www.cnblogs.com/dplearning/p/7520211.html
Copyright © 2011-2022 走看看