zoukankan      html  css  js  c++  java
  • Python—Celery 框架使用

    一、Celery 核心模块

    1. Brokers

    brokers 中文意思为中间人,在这里就是指任务队列本身,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker,Brokers 就是生产者和消费者存放/拿取产品的地方(队列)。Celery 扮演生产者和消费者的角色。

    常见的 brokers 有 rabbitmq、redis、Zookeeper 等。推荐用Redis或RabbitMQ实现队列服务。

    2. Workers

    就是 Celery 中的工作者,执行任务的单元,类似与生产/消费模型中的消费者。它实时监控消息队列,如果有任务就从队列中取出任务并执行它。

    3. Backend / Result Stores

    用于存储任务的执行结果。队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了。

    常见的 backend 有 redis、Memcached 甚至常用的数据库都可以。

    4. Tasks

    就是想在队列中进行的任务,有异步任务和定时任务。一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。

    5. Beat

    定时任务调度器,根据配置定时将任务发送给Brokers。

    二、Celery 基本使用 

    1.创建一个celery application 用来定义你的任务列表,创建一个任务文件就叫tasks.py吧。

    from celery import Celery
    
    # 配置好celery的backend和broker 
    app = Celery('task1',  backend='redis://127.0.0.1:6379/0', broker='redis://127.0.0.1:6379/0')
     
    #普通函数装饰为 celery task
    @app.task  
    def add(x, y):
        return x + y
    

    如此而来,我们只是定义好了任务函数func函数和worker(celery对象)。worker相当于工作者。

    2.启动Celery Worker来开始监听并执行任务。broker 我们有了,backend 我们有了,task 我们也有了,现在就该运行 worker 进行工作了,在 tasks.py 所在目录下运行:

    [root@localhost ~]# celery -A tasks worker --loglevel=info    # 启动方法1 
    [root@localhost ~]# celery -A tasks worker --l debug          # 启动方法2
    

    现在 tasks 这个任务集合的 worker 在进行工作(当然此时broker中还没有任务,worker此时相当于待命状态),如果队列中已经有任务了,就会立即执行。

    3.调用任务:要给Worker发送任务,需要调用 delay() 方法。

    import time
    from tasks import add
    
    # 不要直接add(6, 6),这里需要用 celery 提供的接口 delay 进行调用
    result = add.delay(6, 6) 
    while not result.ready():
        time.sleep(1)
    print('task done: {0}'.format(result.get()))
    

    三、Celery 进阶使用

    1.celery_config.py:配置文件

    from __future__ import absolute_import, unicode_literals
    #从python的绝对路径导入而不是当前的脚本     #在python2和python3做兼容支持的
    
    BROKER_URL = 'redis://127.0.0.1:6379/0'
    # 指定结果的接受地址
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

    2.tasks.py

    from __future__ import absolute_import, unicode_literals
    #从python的绝对路径导入而不是当前的脚本     #在python2和python3做兼容支持的
    from celery import Celery
    
    # 配置好celery的backend和broker, task1:app的名字。broker
    app = Celery('task1',                              # 
                 broker='redis://127.0.0.1:6379/0',   # 消息队列:连rabbitmq或redis
                 backend='redis://127.0.0.1:6379/0')  # 存储结果:redis或mongo或其他数据库
     
    app.config_from_object("celery_config")
    app.conf.update(         # 给app设置参数
        result_expires=3600, # 保存时间为1小时
    )
    
    #普通函数装饰为 celery task
    @app.task  
    def add(x, y):
        return x + y
        
    if __name__ == '__main__':
        app.start()

    3.启动worker

    [root@localhost ~]# celery -A tasks worker --loglevel=info

    4.test.py

    # -*- coding:utf-8 -*-
    import time
    from tasks import add
    
    # 不要直接add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
    result = add.delay(6, 6) 
    print(result.id)
    while not result.ready():
        time.sleep(1)
    print('task done: {0}'.format(result.get()))

    四、Celery 定时任务

    参考:https://www.cnblogs.com/forward-wang/p/5970806.html

    参考:https://www.cnblogs.com/shizhengwen/p/6911043.html

    参考:https://blog.51cto.com/steed/2292346?source=dra

    参考:https://blog.csdn.net/qq_37049050/article/details/82260151

    参考:https://www.cnblogs.com/zhangbingsheng/p/10384517.html

    参考:https://www.cnblogs.com/cwp-bg/p/8759638.html

  • 相关阅读:
    利用深度学习网络自动给图像上色的文章和相关工程实现
    Perceptual Losses for Real-Time Style Transfer and Super-Resolution and Super-Resolution 论文笔记
    (转) Dissecting Reinforcement Learning-Part.2
    (转) 多模态机器翻译
    编译caffe的Python借口,提示:ImportError: dynamic module does not define module export function (PyInit__caffe)
    (转)How Hash Algorithms Work
    深度学习 目标检测算法 SSD 论文简介
    (转)能根据文字生成图片的 GAN,深度学习领域的又一新星
    (转) Face-Resources
    (转) AdversarialNetsPapers
  • 原文地址:https://www.cnblogs.com/liuhaidon/p/12036498.html
Copyright © 2011-2022 走看看