zoukankan      html  css  js  c++  java
  • 【kafka】celery与kafka的联用问题

    背景:一个小应用,用celery下发任务,任务内容为kafka生产一些数据。

    问题:使用confluent_kafka模块时,单独启用kafka可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了。

    解决:换了个pykafka模块,结果问题就没有了。

    我很疑惑啊,是我调用confluent_kafka的方法不对吗,怎么套上celery就不行了呢?

    可以用的pykafka代码:

    tasks.py

    from celery import Celery
    from pykafka import KafkaClient
    import json
    
    
    app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')
    
    @app.task
    def produce():
        client = KafkaClient(hosts="localhost:9092")
        print client.topics
        topic = client.topics['test_result']
        with topic.get_sync_producer() as producer:
            for i in range(3):
                data = {"info": {"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"}
                print('Producing message: %s' % data)
                producer.produce(json.dumps(data))
            print "finish produce"
            producer.stop()
            print "stop"                 

    run_worker.py

    from tasks import produce
    
    for i in range(1000):
        result = produce.delay()
        print result.status

    无法正常生产数据的confluent_kafka代码:

    tasks.py

    from celery import Celery
    from kafka_producer import p
    import json
    
    
    app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost')
    
    @app.task
    def produce():
        for i in range(3000):
            data = {"info": {"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"}
            print('Producing message: %s' % data)
            p.produce('test_result3', json.dumps(data))
        print "finish produce"
        p.flush()
        print "finish flush"

    run_worker.py

    from tasks import produce
    result = produce.delay()
    print result.status
    print result.ready()
    print result.get()
    print result.status
  • 相关阅读:
    赫尔维茨公式
    从解析几何的角度分析二次型
    Struts 1 Struts 2
    记一次服务器被入侵的调查取证
    契约式设计 契约式编程 Design by contract
    lsblk df
    Linux Find Out Last System Reboot Time and Date Command 登录安全 开关机 记录 帐号审计 历史记录命令条数
    Infrastructure for container projects.
    更新文档 版本控制 多版本并发控制
    Building Microservices: Using an API Gateway
  • 原文地址:https://www.cnblogs.com/dplearning/p/7520211.html
Copyright © 2011-2022 走看看