zoukankan      html  css  js  c++  java
  • Python

    Celery - 概念

    简单的灵活可靠的处理大量消息的分布式系统

    专注于实时处理的异步任务队列, 同时也支持任务调度

    结构图

    使用场景

    异步任务  将耗时的操作任务提交给 Celery 去异步执行 - 比如发送短信 / 邮件, 消息推送, 音视频处理等

    定时任务  类似于 crontab, 比如每日的数据统计

    消息中间件

    可选  [ RabbitMQ / Redis ] 

    Redis 的安装 跳转这里 

    Celery - 使用

    安装

    pip install celery[redis]

    基本使用 - 异步任务

    基础版本

    先创建一个简单的耗时阻塞的任务, 很显然会在中间卡顿 4s 等待

    # -*- coding: utf-8 -*-
    import time
    
    
    def add(x, y):
        print "in tasks ---"
        time.sleep(4)
        return x + y
    
    
    if __name__ == '__main__':
        print 'start ---'
        ret = add(2, 8)
        print "end ---"
        print ret
    start ---
    in tasks ---
    end ---
    10

    加入 celery 的优化版本

     将原始的 的 add 方法重新优化一下变成 Celery 的版本

    tasks.py

    # -*- coding: utf-8 -*-
    import time
    from celery import Celery
    
    broker = 'redis://localhost:6379/1'
    backend = 'redis://localhost:6379/2'
    
    app = Celery('my_task', broker=broker, backend=backend)
    
    
    @app.task
    def add(x, y):
        print "in tasks ---"
        time.sleep(4)
        return x + y

     app.py

    # -*- coding: utf-8 -*-
    
    from tasks import add
    
    if __name__ == '__main__':
        print 'start ---'
        ret = add.delay(2, 8)
        print "end ---"
        print ret

     再运行的时候便没有卡顿, 而是以一个任务标识的形式进行返回

    start ---
    end ---
    a3e85efa-859f-45de-af27-e582ff54ddef

    此时任务将被丢入 redis 中等待调度, 需要打开 celery 的 worker 进行处理

    Celery - worker 

    在次目录下进行

    celery worker  -A tasks -l INFO

    -l 表示日志打印等级

    -A 指定执行的函数文件

    此时若因版本过高会触发系列报错, 详情 跳转这里 

    成功后会打印配置信息, 比如此处设置的 redis 以及 被执行的 add 都可以显示出

    简单测试 

    打开 worker 之后就可以进行执行调用了

    当前目录再次打开一个解释器进行简单操作

     在 worker 这边的日志就可以打印出来了

     

     相关的执行结果和执行状态也可以通过一些方法进行获取到

    流程梳理

    app.py 中进行 add 的函数调用,调用直接返回任务标识

    真正的任务在 worker 中异步执行, 然后再 app.py 中的后续代码将不收到阻塞的影响

    通过  .ready .get  方法可以获取执行状态以及相关的结果

    Celery 目录结构

    较为标准的目录格式是需要代码和配置文件分离更加直观

     

     __init__.py 

    初始化文件中进行初始化的模型创建以及配置文件的导入

    # -*- coding: utf-8 -*-
    from celery import Celery
    
    app = Celery('demo')
    
    app.config_from_object('celery_app.celeryconfig')  # 通过 celery 实例加载配置模块

    celeryconfig.py

    配置文件中进行配置文件的相关操作, 这里主要是配置 Redis 的相关配置以及简单的时区更改展示

    Redis 依旧是存放任务的, 以及存放结果需要两个配置配置

    # -*- coding: utf-8 -*-
    
    BROKER_URL = 'redis://localhost:6379/1'
    
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
    
    CELERY_TIMEZONE = 'Asia/Shanghai'  # 默认 UTC 时间
    
    # 导入指定的任务模块
    CELERY_IMPORTS = (
        'celery_app.task1',
        'celery_app.task2',
    )

    task1.py / task2.py

    task 则用于实际逻辑的函数编写即可

    # -*- coding: utf-8 -*-
    import time
    from celery_app import app
    
    
    @app.task
    def multiply(x, y):
        time.sleep(3)
        return x * y
    # -*- coding: utf-8 -*-
    import time
    from celery_app import app
    
    
    @app.task
    def add(x, y):
        time.sleep(3)
        return x + y

    app.py

    app 用于外部的调用

    # -*- coding: utf-8 -*-
    from celery_app import task1, task2
    
    if __name__ == '__main__':
        task1.add.apply_async(5, 10)
        task2.multiply.delay(5, 10)
        print 'end  -----'

     基础使用 - 定时任务

  • 相关阅读:
    从相册中获取图片
    Android中bitmap的相关处理
    Sublime Text的使用代码块安装的模块
    angular之控制器(0)
    13.程序集篇
    12.序列化与反序列篇
    11.泛型篇
    10.集合篇
    9.多态篇
    8.继承篇
  • 原文地址:https://www.cnblogs.com/shijieli/p/12011211.html
Copyright © 2011-2022 走看看