zoukankan      html  css  js  c++  java
  • Celery 分布式异步任务框架

    一、什么是celery

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。主要是执行 异步任务定时任务

    异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    1、消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    2、任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    3、任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    二、安装

    安装

    pip install celery
    

    三、使用

    1 异步任务

    (1)创建任务

    创建celery_task.py

    import celery
    import time
    # broker='redis://127.0.0.1:6379/2' 不加密码
    backend='redis://:123456@127.0.0.1:6379/1'       # 结果存储
    broker='redis://:123456@127.0.0.1:6379/2'       # 消息中间件
    cel=celery.Celery('test',backend=backend,broker=broker)
    
    @cel.task
    def add(x,y):
        return x+y
    

    (2)添加任务(broker)

    添加任务到消息中间件的队列中,但是没有执行任务

    result不是函数执行结果,它是个对象

    创建add_task.py

    from celery_task import add
    result = add.delay(4,5)
    print(result.id)
    

    (3)执行任务(worker)

    创建py文件:run.py,执行任务,或者

    命令执行:celery worker -A celery_task -l info

    注:windows下:celery worker -A celery_task -l info -P eventlet

    常用命令来执行任务

    from celery_task import cel
    if __name__ == '__main__':
        cel.worker_main()
        # cel.worker_main(argv=['--loglevel=info')
    

    (4)查看执行结果(result)

    创建py文件:result.py,查看任务执行结果

    from celery.result import AsyncResult
    from celery_task import cel
    
    async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    2 多任务结构

    pro_cel
        ├── celery_task
        │   ├── celery.py   # 必须这个名字
        │   └── tasks1.py    #  所有任务函数
        │	└── tasks2.py    #  所有任务函数
        ├── check_result.py 	# 检查结果
        └── send_task.py    # 添加任务
    

    (1)celery.py --- 配置

    from celery import Celery
    
    broker='redis://127.0.0.1:6379/1',
    backend='redis://127.0.0.1:6379/2',
    cel = Celery('celery_demo', broker=broker, backend=backend,
                 # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
                 include=['celery_task.tasks1',
                          'celery_task.tasks2'
                          ])
    
    # 时区
    cel.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    cel.conf.enable_utc = False
    

    (2)task1.py与task2.py --- 创建任务

    tasks1.py

    import time
    from celery_task.celery import cel
    @cel.task
    def test_celery(res):
        time.sleep(5)
        return "test_celery任务结果:%s"%res
    

    tasks2.py

    import time
    from celery_task.celery import cel
    @cel.task
    def test_celery2(res):
        time.sleep(5)
        return "test_celery2任务结果:%s"%res
    

    (3)send_task.py --- 添加任务

    from celery_task.tasks1 import test_celery
    from celery_task.tasks2 import test_celery2
    
    # 立即告知celery去执行test_celery任务,并传入一个参数
    result = test_celery.delay('第一个的执行')
    print(result.id)
    result = test_celery2.delay('第二个的执行')
    print(result.id)
    

    (4)通过命令执行任务

    celery worker -A celery_task -l info -P eventlet
    

    (5)check_result.py --- 查看任务结果

    from celery.result import AsyncResult
    from celery_task.celery import cel
    
    async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除,执行完成,结果不会自动删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    3 定时任务

    执行定时任务的创建任务、执行任务、查看任务结果与执行异步任务相同,不同的是在添加任务时,设定时间。

    from celery_app_task import add
    from datetime import datetime
    
    # 方式一
    # 在指定时间执行该任务
    v1 = datetime(2019, 2, 13, 18, 19, 56)
    print(v1)
    # 当前时间对象,转成utc时间
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
    result = add.apply_async(args=[1, 3], eta=v2)
    print(result.id)
    
    # 方式二
    # 在当前时间的后延10s执行任务
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    
    # 使用apply_async并设定时间,args是任务函数参数,eta指定时间
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    

    定时任务

    celery.py

    from datetime import timedelta
    from celery import Celery
    from celery.schedules import crontab
    
    cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
        'celery_task.tasks1',
        'celery_task.tasks2',
    ])
    cel.conf.timezone = 'Asia/Shanghai'
    cel.conf.enable_utc = False
    
    cel.conf.beat_schedule = {
        # 名字随意命名
        'add-every-10-seconds': {
            'task': 'celery_task.tasks1.test_celery',
            'schedule': timedelta(seconds=2),
            # 传递参数
            'args': ('test',)
        },
        # 'add-every-12-seconds': {
        	  # 执行tasks1下的test_celery函数
        #     'task': 'celery_task.tasks1.test_celery',
        #     每年4月11号,8点42分执行
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        #     每个月的11号,8点42分执行
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11),
        #     每天8点42分执行
        #     'schedule': crontab(minute=42, hour=8),
        #     'args': (16, 16)
        # },
    }
    

    task1.py

    from .celery import app
    
    @app.task
    def add():
    

    celery.py python <= 3.6

    from celery import Celery
    from celery.schedules import crontab
    
    broker = 'redis://127.0.0.1:6379/1'  # broker任务队列
    backend = 'redis://127.0.0.1:6379/2'  # 结构存储,执行完的结果存在这
    app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.task1',])
    
    app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.CELERY_ENABLE_UTC = False
    # 任务的定时配置
    from datetime import timedelta
    
    app.conf.CELERYBEAT_SCHEDULE = {
        'add-task': {
            'task': 'celery_task.task1.add',
            'schedule': timedelta(minutes=1),  # crontab(minute='*/1') timedelta(seconds=60)
        }
    }
    

    注意:

    celery 4.x 不支持windows,要用3.x,配置参数 在 4.x 变为小写,3.x 还是大写;

    python解释器因为关键字 async >= 3.7,所以要用 python <= 3.6

    启动定时任务

    启动一个beatcelery beat -A celery_task -l info

    启动work执行celery worker -A celery_task -l info -P eventlet

    [2020-08-16 16:15:02,561: INFO/MainProcess] beat: Starting...
    [2020-08-16 16:19:18,315: INFO/MainProcess] Scheduler: Sending due task add-task (celery_task.task1.add)
    [2020-08-16 16:24:18,315: INFO/MainProcess] Scheduler: Sending due task add-task (celery_task.task1.add)
    

    beat 提交任务,里面 相当于 会有时钟记录上一次的提交时间,所以,距离上次提交的时间不到五分钟,所以,beat 命令输入没有立即 提交任务,而是等到了与上次提交任务的时间差为 5 分钟,然后,再次提交任务。

  • 相关阅读:
    python excel导入到数据库
    ubuntu14.04修改mysql默认编码
    python 向MySQL里插入中文数据
    hbase框架原理
    hive框架原理
    Hadoop的MapReduce模型基本原理
    机器学习模型效果评价
    spark架构原理
    Hadoop架构原理
    特征工程
  • 原文地址:https://www.cnblogs.com/pythonwl/p/13531450.html
Copyright © 2011-2022 走看看