zoukankan      html  css  js  c++  java
  • python之异步任务框架Celery

    官网参考:

    Celery 官网:http://www.celeryproject.org/

    Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

    Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

    介绍:

    """
    1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
    2)celery服务为为其他项目服务提供异步解决任务需求的
    注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
    
    人是一个独立运行的服务 | 医院也是一个独立运行的服务
        正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
        人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
    """

    Celery架构图:

      Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    三、使用场景

    异步执行:解决耗时任务

    延迟执行:解决延迟任务

    定时执行:解决周期(周期)任务

    四、Celery的安装配置

    pip install celery

    消息中间件:RabbitMQ/Redis

    app=Celery('任务名', broker='xxx', backend='xxx')

    五、两种celery任务结构:提倡用包管理,结构更清晰

    # 如果 Celery对象:Celery(...) 是放在一个模块下的
    # 1)终端切换到该模块所在文件夹位置:scripts
    # 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
    # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
    # 注:模块名随意
    
    
    # 如果 Celery对象:Celery(...) 是放在一个包下的
    # 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
    # 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
    # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
    # 注:包名随意

    七、Celery执行异步任务

    包架构封装

    project
        ├── celery_task      # celery包
        │   ├── __init__.py # 包文件
        │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
        │   └── tasks.py    # 所有任务函数
        ├── add_task.py      # 添加任务
        └── get_result.py   # 获取结果

    celery.py 基本配置

    # 1)创建app + 任务
    
    # 2)启动celery(app)服务:
    # 非windows
    # 命令:celery worker -A celery_task -l info
    # windows:
    # pip3 install eventlet
    # celery worker -A celery_task -l info -P eventlet
    
    # 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
    
    # 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
    
    
    from celery import Celery
    # 无密码
    broker = 'redis://127.0.0.1:6379/1'    
    backend = 'redis://127.0.0.1:6379/2'
    # 有密码:
    broker = 'redis://:123@127.0.0.1:6379/1'
    backend = 'redis://:123@127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    '''
    broker : 任务仓库
    backend : 任务结果仓库
    include :任务(函数)所在文件
    '''

    tasks.py 添加任务

    from .celery import app
    
    @app.task
    def add(n1,n2):
        res = n1+n2
        print('n1+n2 = %s' % res)
        return res
    
    
    @app.task
    def low(n1,n2):
        res = n1-n2
        print('n1-n2 = %s' % res)
        return res

    add_task.py 添加立即、延迟任务

    from celery_task import tasks
    
    # delay  :添加立即任务
    # apply_async : 添加延迟任务
    # eta : 执行的utc时间
    
    
    # 添加立即执行任务
    t1 = tasks.add.delay(10, 20)
    t2 = tasks.low.delay(100, 50)
    print(t1.id)
    
    
    # 添加延迟任务
    from celery_package.tasks import jump
    from datetime import datetime,timedelta
    
    # 秒
    def eta_second(second):
        ctime = datetime.now()  # 当前时间
        utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())  # 当前UTC时间
        time_delay = timedelta(seconds=second)  # 秒
        return utc_ctime + time_delay  # 当前时间+往后延迟的秒
    # 天
    def eta_days(days):
        ctime = datetime.now()  # 当前时间
        utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())  # 当前UTC时间
        time_delay = timedelta(days=days)  # 天
        return utc_ctime + time_delay  # 当前时间+往后延迟的天
    
    jump.apply_async(args=(20,5), eta=eta_second(10))  # 10秒后执行
    jump.apply_async(args=(20,5), eta=eta_days(1))  # 1天后执行

    get_result.py 获取结果

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')

    九、高级使用

    celery.py 定时任务配置(循环的)

    特点:

    添加任务的终端关闭之后,停止添加

    celery服务端关闭后,把关闭之后未执行的任务都执行一遍,然后继续接收任务

    # 1)创建app + 任务
    
    # 2)启动celery(app)服务:
    # 注):-A  表示相对路径,所以一定先进入celery_task所在包
           -l 表示打印到日志 info 级别
    # 非windows
    # 命令:celery worker -A celery_task -l info
    # windows:
    # pip3 install eventlet
    # celery worker -A celery_task -l info -P eventlet
    
    # 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
    # 命令:celery beat -A celery_task -l info
    
    # 4)获取结果
    
    
    from celery import Celery
    
    # 无密码
    broker = 'redis://127.0.0.1:6379/1'    
    backend = 'redis://127.0.0.1:6379/2'
    # 有密码:
    broker = 'redis://:123@127.0.0.1:6379/1'
    backend = 'redis://:123@127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 自动任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    
    app.conf.beat_schedule = {
        # 定时任务名字
        'fall_task': {
            'task': 'celery_task.tasks.fall',
            'args':(30,20),
            'schedule': timedelta(seconds=3),  # 3秒后执行
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        }
    }
    
    '''
    fall_task:任务名自定义
    task:任务来源
    args:任务参数
    schedule:定时时间
    '''
    
    
    'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
    '''
    minute : 分钟
    hour :小时
    day_of_week :礼拜
    day_of_month:月
    month_of_year:年
    '''

    tasks.py

    from .celery import app
    
    @app.task
    def fall(n1,n2):
        res = n1/n2
        print('n1 /n2 = %s' % res)
        return res

    get_result.py

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')

    参考:https://www.cnblogs.com/guyouyin123/p/12420344.html

  • 相关阅读:
    26 转义符 re模块 方法 random模块 collection模块的Counter方法
    25 正则表达式
    24 from 模块 import 名字
    24 from 模块 import 名字
    24 from 模块 import 名字
    23 析构方法 items系列 hash方法 eq方法
    21 isinstance issubclass 反射 _str_ _new_ _len_ _call_
    20 属性, 类方法, 静态方法. python2与python3的区别.
    python(1)
    python之字符串格式化
  • 原文地址:https://www.cnblogs.com/xingxia/p/python3_celery.html
Copyright © 2011-2022 走看看