Celery实现了分布式任务队列的功能,提供异步执行,定时任务两个特性。应用首先将任务封装后发送到Broker,Celery启动多个Worker从Broker中获取任务并执行,通过Broker这一层实现异步特性;Celery提供Beat调度器进行定时任务的调度执行,从而实现定时任务功能。
基本架构图如下:
基本使用
-
安装
pipenv install celery
-
单文件使用
# vim tasks.py import time from celery import Celery broker = 'redis://192.168.2.128:6379/1' backend = 'redis://192.168.2.128:6379/2' app = Celery('my_task', broker=broker, backend=backend) @app.task def add(x, y): print('enter call func ...') time.sleep(4) return x + y
add任务在注册时的名字为tasks.add,调用时需要from tasks import add然后才能执行成功
# vim app.py from tasks import add print('start task ...') result = add.delay(2, 8) print('end task ...')
如果在tasks.py下执行print(add)得到结果<@task: my_task.add of my_task at 0x101fe61d0>
如果在app.py下导入add后执行print(add)得到结果<@task: tasks.add of my_task at 0x103135c18>
所以如果直接在tasks.py下执行add.delay(2, 8),虽然任务发送成功,但是
worker会报错 Received unregistered task of type 'my_task.add'
这里留一个疑问为什么两个任务名字会有不同?
-
模块化使用
# ls celery_app __init__.py # 创建app对象并加载配置文件 celeryconfig.py # 定义app配置并进行任务注册 task1.py task2.py # 任务模块
# vim __init__.py from celery import Celery app = Celery('demo') # 通过celery 实例加载配置 app.config_from_object('celery_app.celeryconfig')
# vim celeryconfig.py from datetime import timedelta from celery.schedules import crontab # APP配置 BROKER_URL = 'redis://192.168.2.128:6379/1' CELERY_RESULT_BACKEND = 'redis://192.168.2.128:6379/2' CELERY_TIMEZONE = 'Asia/Shanghai' # 任务注册 CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2', ) # 定时任务 CELERYBEAT_SCHEDULE = { 'task1': { 'task': 'celery_app.task1.add', 'schedule': timedelta(seconds=10), 'args': (2, 8) }, 'task2': { 'task': 'celery_app.task2.multiply', 'schedule': crontab(hour=20, minute=46), 'args': (4, 5) } }
# vim task1.py import time from celery_app import app @app.task def add(x, y): time.sleep(3) return x + y
# vim task2.py import time from celery_app import app @app.task def multiply(x, y): time.sleep(4) return x * y
-
启动命令
celery worker -A celery_app.tasks -l INFO # -A 指定app所在模块 -l 指定日志级别 celery beat -A celery_app.tasks -l INFO