zoukankan      html  css  js  c++  java
  • python中的Celery基本使用

    一、Celery异步任务框架

    Celery是一个异步任务框架,并且是一个简单、灵活可靠的,处理大量消息的分布式系统

    Celery服务为其他项目服务提供异步解决任务的需求,内置socket

    Celery可执行的任务:执行异步任务,执行延迟任务,执行定时任务

    Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

    二、Celery架构

    Celery是由三部分组成的,消息中间件(message broker)、任务执行单元(worker)、任务结果存储(task result store)组成的。

    Broker(任务中间件)————>Worker(任务执行者)————>Backend(任务结果仓库)

    消息中间件:Celery是不提供消息服务的,但是可以使用第三方来提供消息服务(提供任务),列如,Redis。

    任务执行单元:Worker会自动(后台异步)执行消息中间件(broker)中的任务任务。

    任务结果存储:将Worker执行的结果存储在backend中,可以使用Redis来存储

    三、Celery任务结构

    Celery有两种任务结构,基本结构、包架构封装,但是我们提倡使用包架构封装,因为结构更加清晰,例如:

    project
        ├── celery_task  	# celery包
        │   ├── __init__.py # 包文件
        │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
        │   └── tasks.py    # 所有任务函数
        ├── add_task.py  	# 添加任务
        └── get_result.py   # 获取结果
    

    包架构封装

    celery.py

    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'     # broker 任务队列,任务放到这里面
    backend = 'redis://127.0.0.1:6379/2'    # backend 结果存储,执行结果放在这里面
    
    app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
    

    tasks.py

    from .celery import app
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.task
    def multip(x, y):
        return x * y
    

    异步任务执行:

    add_task.py

    把 tasks.py 中的任务函数添加到 broker 中

    windows 首先需要安装:pip install celery 和 pip install eventlet

    需要启动 celery, 在包项目下输入以下命令:

    C:project> celery -A celery_task worker -P eventlet -l info

    celery -A 包名 worker -P eventlet -l info

    from celery_task.tasks import add
    
    # 提交异步任务
    ret = add.delay(5, 3)	# 把add函数任务添加到 broker中,worker在异步实时取出执行
    
    print(ret)	# 0cc72e56-4604-4c00-bb3d-5b456f4869a7	获取执行结果需要此ID
    

    延迟任务执行:

    add_task.py

    还是需要先启动celery

    from celery_task.tasks import multip
    # 提交延迟任务
    from datetime import datetime, timedelta
    
    # 需要UTC时间
    eta = datetime.utcnow() + timedelta(seconds=10)		# 当前UTC时间往后加10秒
    ret = multip.apply_async(args=(9, 9), eta=eta)		# 10 秒之后执行
    
    print(ret)	# 3c8cfa57-05ff-4a26-b8fa-1f7f2d8051f2	获取执行结果需要此ID
    

    定时任务执行:

    执行定时任务需要从新配置celery.py

    from celery import Celery
    from datetime import timedelta
    from celery.schedules import crontab
    
    broker = 'redis://127.0.0.1:6379/1'  # broker 任务队列,任务放到这里面
    backend = 'redis://127.0.0.1:6379/2'  # backend 任务队列,执行结果放在这里面
    
    app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    app.conf.beat_schedule = {
        # add 任务
        'add-task': {
            'task': 'celery_task.tasks.add',
            'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点执行一次
            'args': (300, 150),
        },
        # multip 任务
        'multip-task': {
            'task': 'celery_task.tasks.multip',
            'schedule': timedelta(seconds=3),           # 每三秒执行一次
            'args': (300, 150),
        }
    }
    

    启动 worker 等待执行任务

    celery -A celery_task beat -l info
    
    celery -A 包名 beat -l info
    

    启动 beat 将任务添加 broker 中,让worker执行

    celery -A celery_task worker -P eventlet -l info
    
    celery -A 包名 worker -P eventlet -l info
    

    查看任务执行结果:

    get_result.py

    from celery_task.celery import app
    from celery.result import AsyncResult
    
    id = '3fedc0d8-32c8-4b1a-af43-fedfac6107a2'
    
    if __name__ == '__main__':
        asyncs = AsyncResult(id=id, app=app)
    
        if asyncs.successful():
            result = asyncs.get()
            print(result)	# 成功则取出backend中id对应的值
            
        elif asyncs.failed():
            print('任务失败')
        elif asyncs.status == 'PENDING':
            print('任务等待中被执行')
        elif asyncs.status == 'RETRY':
            print('任务异常后正在重试,或id不存在')
        elif asyncs.status == 'STARTED':
            print('任务已经开始被执行')
    

    基本结构

    创建py文件:celery_app_task.py

    from celery import Celery
    import time
    
    # backend='redis://:123456@127.0.0.1:6379/1'	# 有密码123456
    
    broker = 'redis://127.0.0.1:6379/1'  	# broker 任务队列,任务放到这里面
    backend = 'redis://127.0.0.1:6379/2'  	# backend 任务队列,执行结果放在这里面
    
    app = Celery(__name__, broker=broker, backend=backend)
    
    @app.task
    def add(x, y):
        return x + y
    

    启动 worker

    celery -A celery_app_task worker -P eventlet -l info
    

    添加任务:add_task.py

    from celery_app_task import add
    
    # 提交任务
    ret = add.delay(5, 3)	# 往 broker 中添加一个任务
    print(ret)
    
    学习之旅
  • 相关阅读:
    51nod1179【思维】
    网络时间校准
    获取网络时间,减轻自己服务器的请求压力
    C# async/await异步编程深入理解
    C#中的异步编程--探索await与async关键字的奥妙之处,原来理解和使用异步编程可以这么简单
    "",string.Empty和null三者的区别
    三万字把SQL数据库的所有命令,函数,运算符讲得明明白白讲解,内容实在丰富
    C# await async Task
    C#异步编程 Task await的理解
    idea中更改下载源
  • 原文地址:https://www.cnblogs.com/XiaoYang-sir/p/15041412.html
Copyright © 2011-2022 走看看