zoukankan      html  css  js  c++  java
  • celery中的生产者消费者问题

    celery中的生产者消费者问题

    task1.py文件中:

    # demo1:task.py and celery.py in one file
    # run it by
    from celery import Celery
    import time

    # 定义worker(消费者),并指定broker和backend(共享缓冲区)
    # 每启动一个woker就相当于创建一个消费者(启动woker方法:celery -A 创建app的语句所在的文件名)
    # 给woker指定queue的方法:
    # 1、启动woker时-Q指定创建的这个消费者要从共享缓冲区中哪个队列中取出产品并消费
    # 2、如果没有指定queue,则启动的woker默认从共享缓冲区的default队列中取出产品并消费
    # 注意:两个不同的woker监听共享缓冲区中的同一个队列会出错(避免出错的方法:给启动的每个woker用-Q指定要监听的队列,并用@...(queue='')的方法给要在使用该woker处理的task函数指定相同的放入的队列)

    app2=Celery('task2_app2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/0')

    # 定义task(生产者)
    # 在程序执行过程中,每调用一次add.delay就相当于生产者生产一个产品并放入共享缓冲区
    @app2.task(queue='queue2')# 指定该生产者生产的产品要放入共享缓冲区中的哪个队列
    # 给task指定queue的方法:
    # 1、此处是直接给task指定了queue
    # 2、也可以采用给task命名,然后在app.conf.update中定义name与queue的对应规则来批量给多个task指定对应的queue。
    # 3、如果没有指定,则默认将task放入共享缓冲区中名为default的队列中
    def add(x,y):
    print('running ',x,'+',y)
    print(x+y)
    time.sleep(10)
    return x+y

     

     

    生产者:应用程序

    生产动作:调用add.delay()

    共享缓冲区:broker和backend

    消费者:每个woker都是一个消费者

    消费动作:woker启动后会自动监听并从broker中取出任务并执行(消费)

     

    创建消费者:celery -A task1 worker -l info

    (开启worker的实质实际上就是执行app=Celery(...)语句,可以使用 –-concurrency=个数 来限制每个消费者可以并行的线程数)

     

  • 相关阅读:
    web监听器
    闭包
    函数表达式
    android 反向暴力取私有参数 (转载)
    html/weui slider
    自定义取值范围的EditText(记录)
    Android 基于OpenGL ES2.0 的CircleProgressBar
    Android 二维码扫描
    android 反编译网址记录
    Android Opengl ES & Jni 使用
  • 原文地址:https://www.cnblogs.com/zealousness/p/8757762.html
Copyright © 2011-2022 走看看