zoukankan      html  css  js  c++  java
  • Celery 分布式异步任务框架

    一、什么是celery

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。主要是执行 异步任务定时任务

    异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    1、消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    2、任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    3、任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    二、安装

    安装

    pip install celery
    

    三、使用

    1 异步任务

    (1)创建任务

    创建celery_task.py

    import celery
    import time
    # broker='redis://127.0.0.1:6379/2' 不加密码
    backend='redis://:123456@127.0.0.1:6379/1'       # 结果存储
    broker='redis://:123456@127.0.0.1:6379/2'       # 消息中间件
    cel=celery.Celery('test',backend=backend,broker=broker)
    
    @cel.task
    def add(x,y):
        return x+y
    

    (2)添加任务(broker)

    添加任务到消息中间件的队列中,但是没有执行任务

    result不是函数执行结果,它是个对象

    创建add_task.py

    from celery_task import add
    result = add.delay(4,5)
    print(result.id)
    

    (3)执行任务(worker)

    创建py文件:run.py,执行任务,或者

    命令执行:celery worker -A celery_task -l info

    注:windows下:celery worker -A celery_task -l info -P eventlet

    常用命令来执行任务

    from celery_task import cel
    if __name__ == '__main__':
        cel.worker_main()
        # cel.worker_main(argv=['--loglevel=info')
    

    (4)查看执行结果(result)

    创建py文件:result.py,查看任务执行结果

    from celery.result import AsyncResult
    from celery_task import cel
    
    async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    2 多任务结构

    pro_cel
        ├── celery_task
        │   ├── celery.py   # 必须这个名字
        │   └── tasks1.py    #  所有任务函数
        │	└── tasks2.py    #  所有任务函数
        ├── check_result.py 	# 检查结果
        └── send_task.py    # 添加任务
    

    (1)celery.py --- 配置

    from celery import Celery
    
    broker='redis://127.0.0.1:6379/1',
    backend='redis://127.0.0.1:6379/2',
    cel = Celery('celery_demo', broker=broker, backend=backend,
                 # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
                 include=['celery_task.tasks1',
                          'celery_task.tasks2'
                          ])
    
    # 时区
    cel.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    cel.conf.enable_utc = False
    

    (2)task1.py与task2.py --- 创建任务

    tasks1.py

    import time
    from celery_task.celery import cel
    @cel.task
    def test_celery(res):
        time.sleep(5)
        return "test_celery任务结果:%s"%res
    

    tasks2.py

    import time
    from celery_task.celery import cel
    @cel.task
    def test_celery2(res):
        time.sleep(5)
        return "test_celery2任务结果:%s"%res
    

    (3)send_task.py --- 添加任务

    from celery_task.tasks1 import test_celery
    from celery_task.tasks2 import test_celery2
    
    # 立即告知celery去执行test_celery任务,并传入一个参数
    result = test_celery.delay('第一个的执行')
    print(result.id)
    result = test_celery2.delay('第二个的执行')
    print(result.id)
    

    (4)通过命令执行任务

    celery worker -A celery_task -l info -P eventlet
    

    (5)check_result.py --- 查看任务结果

    from celery.result import AsyncResult
    from celery_task.celery import cel
    
    async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除,执行完成,结果不会自动删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    3 定时任务

    执行定时任务的创建任务、执行任务、查看任务结果与执行异步任务相同,不同的是在添加任务时,设定时间。

    from celery_app_task import add
    from datetime import datetime
    
    # 方式一
    # 在指定时间执行该任务
    v1 = datetime(2019, 2, 13, 18, 19, 56)
    print(v1)
    # 当前时间对象,转成utc时间
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
    result = add.apply_async(args=[1, 3], eta=v2)
    print(result.id)
    
    # 方式二
    # 在当前时间的后延10s执行任务
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    
    # 使用apply_async并设定时间,args是任务函数参数,eta指定时间
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    

    定时任务

    celery.py

    from datetime import timedelta
    from celery import Celery
    from celery.schedules import crontab
    
    cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
        'celery_task.tasks1',
        'celery_task.tasks2',
    ])
    cel.conf.timezone = 'Asia/Shanghai'
    cel.conf.enable_utc = False
    
    cel.conf.beat_schedule = {
        # 名字随意命名
        'add-every-10-seconds': {
            'task': 'celery_task.tasks1.test_celery',
            'schedule': timedelta(seconds=2),
            # 传递参数
            'args': ('test',)
        },
        # 'add-every-12-seconds': {
        	  # 执行tasks1下的test_celery函数
        #     'task': 'celery_task.tasks1.test_celery',
        #     每年4月11号,8点42分执行
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        #     每个月的11号,8点42分执行
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11),
        #     每天8点42分执行
        #     'schedule': crontab(minute=42, hour=8),
        #     'args': (16, 16)
        # },
    }
    

    task1.py

    from .celery import app
    
    @app.task
    def add():
    

    celery.py python <= 3.6

    from celery import Celery
    from celery.schedules import crontab
    
    broker = 'redis://127.0.0.1:6379/1'  # broker任务队列
    backend = 'redis://127.0.0.1:6379/2'  # 结构存储,执行完的结果存在这
    app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.task1',])
    
    app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.CELERY_ENABLE_UTC = False
    # 任务的定时配置
    from datetime import timedelta
    
    app.conf.CELERYBEAT_SCHEDULE = {
        'add-task': {
            'task': 'celery_task.task1.add',
            'schedule': timedelta(minutes=1),  # crontab(minute='*/1') timedelta(seconds=60)
        }
    }
    

    注意:

    celery 4.x 不支持windows,要用3.x,配置参数 在 4.x 变为小写,3.x 还是大写;

    python解释器因为关键字 async >= 3.7,所以要用 python <= 3.6

    启动定时任务

    启动一个beatcelery beat -A celery_task -l info

    启动work执行celery worker -A celery_task -l info -P eventlet

    [2020-08-16 16:15:02,561: INFO/MainProcess] beat: Starting...
    [2020-08-16 16:19:18,315: INFO/MainProcess] Scheduler: Sending due task add-task (celery_task.task1.add)
    [2020-08-16 16:24:18,315: INFO/MainProcess] Scheduler: Sending due task add-task (celery_task.task1.add)
    

    beat 提交任务,里面 相当于 会有时钟记录上一次的提交时间,所以,距离上次提交的时间不到五分钟,所以,beat 命令输入没有立即 提交任务,而是等到了与上次提交任务的时间差为 5 分钟,然后,再次提交任务。

  • 相关阅读:
    优先队列
    Problem W UVA 662 二十三 Fast Food
    UVA 607 二十二 Scheduling Lectures
    UVA 590 二十一 Always on the run
    UVA 442 二十 Matrix Chain Multiplication
    UVA 437 十九 The Tower of Babylon
    UVA 10254 十八 The Priest Mathematician
    UVA 10453 十七 Make Palindrome
    UVA 10163 十六 Storage Keepers
    UVA 1252 十五 Twenty Questions
  • 原文地址:https://www.cnblogs.com/pythonwl/p/13531450.html
Copyright © 2011-2022 走看看