zoukankan      html  css  js  c++  java
  • Celery

    Celery简介

    Celery是什么

    1. Celery是python中使用比较多的并行分布式框架
    2. Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
    3. Celery专注于实时处理的异步任务队列
    4. Celery同时也支持任务调

    Celery使用场景

    celery是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)定时任务(crontab)

    • 异步任务: 将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
    • 定时任务: 定时执行某件事情,比如每天数据统计

    Celery工作流程

    Celery核心组件

    Celery的架构由三部分组成,消息中间件(Broker),任务执行单元(Worker)和任务执行结果存储(Result)组成。

    • 消息中间件(Broker)
      Broker负责创建任务队列,根据一些路由规则将任务分派到任务队列,然后将任务从任务队列交付给worker
    • 任务执行单元(Worker)
      Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中,运行后台作业的进程
    • 任务结果存储(Result)
      Result用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis等

    另外: Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    其他: Celery还支持不同的并发、序列化和压缩的手段

    • 并发:prefork、eventlet、gevent、threads
    • 序列化:pickle、json、yaml、msgpack 等
    • 压缩:zlib,、bzip2

    Celery安装相关软件

    备注:我是在windows10环境下做的,Linux应该大同小异

    Celery安装

    pip install celery
    

    其他安装

    这里有个坑.win10系统启动worker报错ValueError: not enough values to unpack (expected 3, got 0),解决办法:

    pip install eventlet
    

    Django中使用的时候,报错 AttributeError: ‘str’ object has no attribute ‘items’。redis版本太高,降低版本 pip install redis==2.10.6

    pip install redis==2.10.6
    

    Celery代码实现

    Celery基本使用方法

    备注:在这里,我使用的是redis作为消息中间件

    目录结构

    代码实现

    # app.py
    from task import add
    
    if __name__ == '__main__':
        print("Start Task ...")
        result = add.delay(2, 8)
        print("result:",result)             # 存到redis之后,返回的id
        print("result_id:",result.id)       # 存到redis之后,返回的id
        print("result:", result.get())      # 方法返回值
        print("End Task ...")
    
    # task.py
    
    import time
    from celery import Celery
    
    # 实例化一个Celery
    broker = 'redis://ip:6379/1'
    backend = 'redis://ip:6379/2'
    
    
    # 参数1 自动生成任务名的前缀
    # 参数2 broker 是我们的redis的消息中间件
    # 参数3 backend 用来存储我们的任务结果的
    app = Celery('my_task', broker=broker, backend=backend)
    
    
    # 加入装饰器变成异步的函数
    @app.task
    def add(x, y):
        print('Enter call function ...')
        time.sleep(4)
        return x + y
    
    
    if __name__ == '__main__':
        # 这里生产的任务不可用,导入的模块不能包含task任务。会报错
        print("Start Task ...")
        result = add.delay(2, 8)
        print("result:", result)
        print("End Task ...")
    
    

    终端启动服务

    celery -A task worker -l info -P eventlet
    
    • A :参数指定celery对象的位置
    • l :参数指定worker的日志级别

    服务启动后的展示

    运行代码,app.py

    备注:不要运行task.py,会报错

    这个时候可以看终端,看看请求

    检验这个id的值

    新增检查文件 check.py

    # check.py
    
    from celery.result import AsyncResult
    from task import app
    
    async_result=AsyncResult(id="455d6ad7-39cc-4e94-9fa9-456ae49cdd97", app=app)
    
    if async_result.successful():
        result = async_result.get()
        print(result)
        # result.forget() # 将结果删除
    elif async_result.failed():
        print('执行失败')
    elif async_result.status == 'PENDING':
        print('任务等待中被执行')
    elif async_result.status == 'RETRY':
        print('任务异常后正在重试')
    elif async_result.status == 'STARTED':
        print('任务已经开始被执行')
    
    
  • 相关阅读:
    【2019.8.14 慈溪模拟赛 T1】我不是!我没有!别瞎说啊!(notme)(BFS+DP)
    【2019.8.8 慈溪模拟赛 T2】query(query)(分治+分类讨论)
    【CometOJ】Comet OJ
    【CodeForces】CodeForcesRound576 Div1 解题报告
    【2019.8.12 慈溪模拟赛 T2】汪哥图(wang)(前缀和)
    【2019.8.12 慈溪模拟赛 T1】钥匙(key)(暴力DP)
    【2019.8.9 慈溪模拟赛 T2】摘Galo(b)(树上背包)
    【BZOJ3171】[TJOI2013] 循环格(网络流)
    【AtCoder】AtCoder Grand Contest 035 解题报告
    【2019.8.11上午 慈溪模拟赛 T2】十七公斤重的文明(seventeen)(奇偶性讨论+动态规划)
  • 原文地址:https://www.cnblogs.com/dongye95/p/15204258.html
Copyright © 2011-2022 走看看