zoukankan      html  css  js  c++  java
  • 23-Celery

    Celery

    一、官方

    Celery官网:http://www.celeryproject.org/

    Celery官方文档引文版:http://docs.celeryproject.org/en/latest/index.html

    Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

    专注于实时处理的异步任务队列

    同时也支持任务调度

    注意:

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
    
    Celery是一个资金很少的项目,所以我们不支持微软的Windows。请不要打开任何与该平台相关的问题。
    

    二、Celery异步任务框架

    1) 可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
    2) celery服务为为其他项目服务提供异步解决任务需求的
        注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
    
    人是一个独立运行的服务 | 医院也是一个独立运行的服务
    	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
    	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
    

    三、Celery架构

    Celery的架构由三部分组成:

    # 1. 消息中间件(message broker)
        Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
        
    # 2. 任务执行单元(worker)和 
        Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
    
    # 3. 任务执行结果存储(task result store)组成。
        Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
    

    image-20210216165235071

    四、使用场景

    异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    延迟执行:解决延迟任务

    定时执行:解决周期(周期)任务,比如每天数据统计

    五、Celery安装

    pip install celery
    

    六、两种celery任务结构

    1. 如果Celery对象:Celery(...)是放在一个模块下的

    1)终端切换到该模块所在文件夹位置:scripts
    2)5.0之前:执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
       5.0以后:执行启动worker的命令:celery -A 模块名 worker -l info -P eventlet
        注意: 
        1. windows系统需要eventlet支持: pip insall eventlet
        2. Linux与MacOS直接执行:celery worker -A 模块名 -l info
    

    2. 如果Celery对象:Celery(...)是放在一个包下的

    1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
    2)5.0之前:执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
       5.0以后:执行启动worker的命令:celery -A 模块名 worker -l info -P eventlet
        注意: 
        1. windows系统需要eventlet支持: pip insall eventlet
        2. Linux与MacOS直接执行:celery worker -A 模块名 -l info
    

    七、Celery之心异步框架

    1. 模块接结构

    image-20210216183216070

    <1>. test_celery/celery_app.py
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    
    
    # include=[被管理的任务文件路径, ]
    # main是app名字,broker是任务中间件,backend是任务结构仓库
    app = Celery(main=__name__,broker=broker,backend=backend,include=[...])
    
    # 添加任务
    @app.task
    def add(x, y):
    	print(x, '+', y, '=', x + y)
    	return x + y
    
    # 放在模块下的启动celery命令
    '''
    windows中: 
        celery worker -A 模块名 -l info -P eventlet
    linux中: 
        clery worker -A 模块名 -l info
    
    -A: 表示被执行的模块路径. 如果是相对, 那么就需要cd到该模块下执行.
    -l: 表示展示日志    
    '''
    

    windows中执行命令:

    5.0之前:celery worker -A celery_app -l info -P eventlet
    5.0以后:celery -A celery_app worker -l info -P eventlet
    
    <2>. test_celery/execute_task.py
    from celert_app import add
    
    # 往broker中添加一个任务.
    '''
    只要是worker一直是在启动的状态, 一旦往broker中添加了任务. 
    那么这个任务就会立刻被worker执行, 执行的结果存储到backend中
    '''
    result = add.delay(2, 3)
    print(result)
    
    <3>. test_celery/get_result.py
    from celery_app import app
    
    from celery.result import AsyncResult
    
    id = 'e5492f41-b00d-42b9-8796-6c4ce803ae9e'
    if __name__ == '__main__':
        # 通过指定任务的id, 指定开启任务的worker实例化一个异步对象. 当一有结果就可以.get获取到对应任务的返回结果.
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    

    2. 包结构

    image-20210216185834115

    celery_task/__init__.py

    celery_task/celery.py

    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    
    # main是app名字,broker是任务中间件,backend是任务结构仓库
    app = Celery(main=__name__, broker=broker, backend=backend, include=('celery_task.task1', 'celery_task.task2'))
    

    celery_task/task1.py

    from .celery import app
    
    
    @app.task
    def add(x, y):
    	print('x+y=', x + y)
    	return x + y
    

    celery_task/task2.py

    from .celery import app
    
    
    @app.task
    def multi(x, y):
    	print('x*y=', x * y)
    	return x * y
    

    script/execute_task_add.py

    from celery_task.task1 import add
    
    result = add.delay(1,2)
    print(result)
    

    script/execute_task_multi.py

    from celery_task.task2 import multi
    
    result = multi.delay(2,3)
    print(result)
    

    script/get_result.py

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '88d00784-9d45-4737-8095-47d245c665e5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    

    3. 三种使用哪个场景示例(以包方式)

    <1>. 异步执行

    celery_task/celery.py

    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    
    # main是app名字,broker是任务中间件,backend是任务结构仓库
    app = Celery(main=__name__, broker=broker, backend=backend, include=('celery_task.task1',))
    

    celery_task/task1.py

    from .celery import app
    
    
    @app.task
    def add(x, y):
    	print('x+y=', x + y)
    	return x + y
    

    script/execute_task_add.py

    from celery_task.task1 import add
    
    result = add.delay(1,2)
    print(result)
    

    script/get_result.py

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '88d00784-9d45-4737-8095-47d245c665e5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    
    <2>. 延迟执行

    celery_task/celery.py

    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    
    # main是app名字,broker是任务中间件,backend是任务结构仓库
    app = Celery(main=__name__, broker=broker, backend=backend, include=('celery_task.task1',))
    

    celery_task/task1.py

    from .celery import app
    
    
    @app.task
    def add(x, y):
    	print('x+y=', x + y)
    	return x + y
    

    script/execute_task_add.py

    from celery_task.task1 import add
    
    
    # 延时任务: (注意: 默认是以utc时间作为当前时间开始往后计时开始定时)
    from datetime import datetime, timedelta
    
    eta = datetime.utcnow() + timedelta(seconds=5)  # 延迟5秒执行
    
    result = add.apply_async(args=(4,5),eta=eta)
    print(result)
    

    script/get_result.py

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '88d00784-9d45-4737-8095-47d245c665e5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    
    <3>. 定时执行

    celery_task/celery.py

    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    
    # main是app名字,broker是任务中间件,backend是任务结构仓库
    app = Celery(main=__name__, broker=broker, backend=backend, include=('celery_task.task1', 'celery_task.task2'))
    
    
    # 定时任务
    from datetime import timedelta
    from celery.schedules import crontab
    
    app.conf.timezone = 'Asia/Shanghai'  # 默认是UTC. 这里切换到了上海的时区.
    app.conf.enable_utc = False          # False表示禁用默认的UTC时间作为当前的定时时间, 而是以上面指定的上海的的时区作为定时开始时间
    app.conf.beat_schedule = {
        'task1': {
            'task': 'celery_task.task1.add',             # 配置定时任务执行task1任务的路径
            'schedule': timedelta(seconds=10),              # 每3秒种执行一次
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'args': (300, 150),                            # 给task1进行传递的参数. 指定kwargs就可以传递关键字参数.
        },
        'task2': {
            'task': 'celery_task.task2.multi',
            'schedule': timedelta(seconds=10),
            # 'schedule': crontab(hour=8, day_of_week=1),
            'args': (300, 150),
        }
    }
    
    # 关于使用crontab进行定时的参数
    '''
    minute='*',          # 第几分钟
    hour='*',            # 第几小时
    day_of_week='*',     # 每周的第几天
    day_of_month='*',    # 每月的第几天 
    month_of_year='*'    # 每年的第几月
    '''
    
    # 启动worker以后, 在另一个总终端中就可以使用一下命令自动将app.conf.beat_schedule中配置的定时任务函数 或 方法 自动执行
    '''
    5.0之前:celery beat -A celery_task -l info -P eventlet
    5.0以后:celery -A celery_task beat -l info -P eventlet
    '''
    

    celery_task/task1.py

    from .celery import app
    
    
    @app.task
    def add(x, y):
    	print('x+y=', x + y)
    	return x + y
    

    script/execute_task_add.py

    from celery_task.task1 import add
    
    result = add.delay(1,2)
    print(result)
    

    script/get_result.py

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '88d00784-9d45-4737-8095-47d245c665e5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    

    4. 关于celery执行任务的坑

    <1>. 导包时的问题
    # from scripts.celery_task import task2
    # from scripts.celery_task import task1
    '''
    celery_task作为包, 被命令执行时. 如果不将celery_task作为定级导入, 那么任务的执行将会是未被注册的. 任务将不会被运行的worker获取, 将会抛出异常.
    [2020-07-26 19:40:56,983: ERROR/MainProcess] Received unregistered task of type 'scripts.celery_task.task2.task2'.
    
    
    因此, 为了在项目中能够在任意位置都可以执行任务. 因此celery_task必定要放在项目的根目录下的. 那么无论在任何位置导入, 都是没有问题的. 如下:
        from celery_task.a.b.c.d import xxx
        
        如果不是在项目的根目录下, 你的导入也许是这样的.
        from scripts.celery_task import task
        那么worker将找不到你指定的任务的路径. 
    
    本质: 就是由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下.
    '''
    
    <2>. 执行时的问题
    • 执行worker命令时:

      image-20210216172132324

      措施:

      5.0之前:celery worker -A 模块名 -l info -P eventlet
      		celery worker -A celery_task -l info -P eventlet 
      5.0以后:celery -A 模块名 worker -l info -P eventlet
      		celery -A celery_task worker -l info -P eventlet
      
    • 执行beat命令时:

      image-20210216194402312

      措施:

      5.0之前:celery beat -A 模块名 -l info -P eventlet
      		celery beat -A celery_task -l info 
      5.0以后:celery -A 模块名 beat -l info -P eventlet
      		celery -A celery_task beat -l info 
      
  • 相关阅读:
    忍者必须死3 模拟器按键设置
    C# 工厂模式 个人基本流程
    WPF Boolean类型转化器收集 反转转化器
    Json实体类驼峰名称转化器
    TDengine + EMQ X + Grafana 轻松搭建高效低成本的边缘侧工业互联网平台
    呼声最高的数据更新功能来了,用户需要什么,我们就开源什么
    年轻人不讲武德,TDengine边缘侧数据存储方案挑战SQLite
    保姆级演示一分钟搞定TDengine的下载安装
    双汇大数据方案选型:从棘手的InfluxDB+Redis到毫秒级查询的TDengine
    HiveMQ TDengine extension 使用指南
  • 原文地址:https://www.cnblogs.com/borntodie/p/14431187.html
Copyright © 2011-2022 走看看