简介
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必须工具。它是一个专注于实现处理的任务队列,同时也支持任务调度。
当然在Celery在执行任务时需要通过一个中间件来接收和发送任务消息,以及存储任务结果,一般会使用rabbitMQ 或者 Redis。
优点
- 简单:一个熟悉了celery的工作流程后,配置和使用还是比较简单的
- 高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活:几乎celery的各个组件都可以被扩展及自定制
celery工作流程图如下
也许现在会有人问,何为任务队列?
任务队列
任务队列是一种在线程或机器间分发任务的机制。
消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
Celery是消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。
所以在使用之前,我们需要安装好环境才可
Celery的安装使用
Celery的默认broker是RabbitMQ, 仅需配置一行就可以
broker_url = 'amqp://guest:guest@localhost:5672//'
事先的安装好rabbitMQ
如果rabbitMQ安装不上也可以使用Redis作为broker
安装redis组件
$ pip install -U "celery[redis]"
配置
app.conf.broker_url = 'redis://localhost:6379/0' redis://:password@hostname:port/db_number app.conf.result_backend = 'redis://localhost:6379/0'
一切准备就绪后,开始使用celery
安装celery模块
$ pip install celery
那么开始创建一个任务文件tasks.py测试一下吧
from celery import Celery app = Celery('tasks', broker='redis://localhost', backend='redis://localhost') @app.task def add(x,y): print("running...",x,y) return x+y # 注:localhost指的是本机,如果想要连远程主机,需要填写要链接的主机IP地址
启动Celery Worker来开始监听并执行任务
$ celery -A tasks worker --loglevel=info
去调用任务
重新打开一个终端去执行
>>> from tasks import add >>> add.delay(4,5)
这时我们会得到一个结果,如果我们这样写,就不会去等待结果的输入,而去做其他事情了
> result = add.delay(4, 5)
我们要查看返回是否已经完成处理任务
>>> result.ready() False
通过get去取值
>>> result.get() 9
与Django配合使用
首先建立一个项目,目录如下
- proj/
- proj/__init__.py
- proj/settings.py
- proj/urls.py
- manage.py
在与settings同等级的目录下建立一个celery.py文件,内容如下
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) # 需要注意的是'proj.settings和app = Celery('proj')是随你创建项目的名字不同而做修改
在同settings同等目录下的__init__中填写如下内容
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
然后就是在你自己的app里创建一个tasks.py填写你的任务,例如
# Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y
然后在Django里的view函数中去执行调用即可
from django.shortcuts import render,HttpResponse from app01 import tasks # Create your views here. from celery.result import AsyncResult def index(request): res = tasks.add.delay(5,5) print(res) return HttpResponse(res.task_id) # 返回的ID def task_res(request): result = AsyncResult(id="") return HttpResponse(result.status)
更多详细用法请见 http://docs.jinkan.org/docs/celery/getting-started/introduction.html#id17