引言
前面一篇文章已经介绍了celery相关知识,有兴趣可以看官方文档深入了解。下面介绍一下使用redis作为消息中间件来使用celery异步执行定时任务。
简介
先看一下草图:
Celery异步任务示例
先准备环境:
redis == 3.2.1 celery == 4.3.0 python == 3.6.5
新建一个项目testcelery,为celery应用创建一个模块。对于小的应用,通常的做法是把所有代码放在一个叫tasks.py的文件中
import celery from celery.schedules import crontab app = celery.Celery('tasks',broker='redis://127.0.0.1:6379/0') @app.task def test(*args): print(args)
ask 是Celery中最基本的单元。Celery有很多装饰器来定义task,只需要编写一个函数并且加上一个装饰器,就能注册一个能异步执行的任务,再新建一个test.py:
from tasks import * test.delay('开始测试')
在执行异步任务时,要保证redis服务是启动的,如图:
然后进入你tasts.py文件所在目录下,执行命令:
celery -A tasks worker --loglevel=info
如果是win10,并且使用我上面的环境,还需要安装一个
pip install eventlet
不然worker会报错,如:
ValueError: not enough values to unpack (expected 3, got 0)
使用新的命令如:
celery -A <mymodule> worker -l info -P eventlet
出现这个就证明异步任务服务启动正常,如图:
我们执行调用函数,测试一下:
结果符合预期,正常异步执行任务成功!
Celery定时任务示例
上面介绍了异步任务,现在当然要使用定时任务看看。
在tasks.py文件中,增加如下代码
@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('learn'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('python'), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Hard Work!'), )
定时任务需要启动一个监控服务beat来监听心跳,还是进入tasks.py文件所在目录,执行命令:
celery -A tasks beat
运行结果:
总结
以上就是python+celery示例,深入了解可以学习官方文档:https://docs.celeryproject.org/en/latest/ 。另外有喜欢学习和交流探讨的朋友,加入我们交流群~