zoukankan      html  css  js  c++  java
  • Celery的简单使用介绍笔记。

    Celery的相关模块

    • 任务模块 Task

      包含异步任务和定时任务. 其中, 异步任务通常在业务逻辑中被触发并发往任务队列, 而定时任务由 Celery Beat 进程周期性地将任务发往任务队列.

    • 消息中间件 Broker(中间人)

      Broker, 即为任务调度队列, 接收任务生产者发来的消息(即任务), 将任务存入队列. Celery 本身不提供队列服务, 官方推荐使用 RabbitMQ 和 Redis 等.

    • 任务执行单元 Worker

      Worker 是执行任务的处理单元, 它实时监控消息队列, 获取队列中调度的任务, 并执行它.

    • 任务结果存储 Backend

      Backend 用于存储任务的执行结果, 以供查询. 同消息中间件一样, 存储也可使用 RabbitMQ, Redis 和 MongoDB 等

    我在使用中,使用在Django的上传文件到oss服务器中,由于改操作不影响用户后续操作,如果让用户等带来不好的用户体验,可以用Celery进行异步操作。

    Celery其实是一个独立的进程,开辟了一条独立的进程用于接受任务模块提交过来的任务。

    上一个项目示意图

    首相需要安装运行环境,

    pip install 'celery[redis]'  #此安装模式告知,redis依赖安装在celery下

    开始必须运行本地redis

    先上一个最简单的Celery代码。

    import time
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/0'
    backend = 'redis://127.0.0.1:6379/0'
    app = Celery('my_task', broker=broker, backend=backend)
    
    @app.task
    def add(x, y):
        time.sleep(5)     # 模拟耗时操作
        return x + y
    

    将文件保存为task.py

    在Python环境下运行命令

    celery worker -A task --loglevel=info  执行Celery

    task是文件的文件名,--loglevel=info为显示的日志信息。

    在环境中运行add

    In [24]: add                                                                    
    Out[24]: <@task: task.add of my_task at 0x1083e7250>
    

     add已经被装饰过,实际已经成为了一个task任务

    如果直接执行add,其实跟运行普通函数没什么区别。

    应该执行add.delay(2, 3),这样的话,才回执行Celery任务。

    可以用变量接收ret = add.delay(2.4)

    等待ret运行的任务执行出结果以后,ret.result可以返回运行结果。

    Djando Celery实战,

    首先新建一个独立的文件夹worker

    里面新建__init__.py与config.py

    在__init__文件里面实例化Celery,在config.py里面填写相关设置信息。

    import os
    
    from celery import Celery
    from worker import config
    # 加载Django 的 setting
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "swiper.settings")
    
    celery_app = Celery('worker')            # 实例话一个celery
    celery_app.config_from_object(config)      # 加载相关配置 
    celery_app.autodiscover_tasks()          #自动监听任务
    

    config.py

    broker_url = 'redis://127.0.0.1:6379/0'
    broker_pool_limit = 10  # Borker 连接池, 默认是10
    
    timezone = 'Asia/Shanghai'
    accept_content = ['pickle', 'json']
    
    task_serializer = 'pickle'
    result_expires = 3600  # 任务过期时间
    
    result_backend = 'redis://127.0.0.1:6379/0'
    result_serializer = 'pickle'
    result_cache_max = 10000  # 任务结果最大缓存数量
    
    worker_redirect_stdouts_level = 'INFO' #日志级别
    

    在需要异步操作的函数上添加@celery_app.task

     启动Celery

    celery worker -A worker --loglevel=info

    执行具体的接口函数:xxx.delay()

    后续有待结果的疑惑,如何进行多worker设置,还有就是result的取出问题,如果异步操作直接取出将是空,很多时候该操作还没返回值,worker还在运行中。

    有空研究一下,redis里面的result_backend里面的相关内容信息。

  • 相关阅读:
    如何提高工作效率,重复利用时间
    好记性不如烂笔头
    如何应对面试中关于“测试框架”的问题
    通宵修复BUG的思考
    工作方法的思考
    别认为那是一件简单的事情
    开发人员需要熟悉缺陷管理办法
    不了解系统功能的思考
    如何布置任务
    事事有回音
  • 原文地址:https://www.cnblogs.com/sidianok/p/11604069.html
Copyright © 2011-2022 走看看