如果你看完本文还有兴趣的话,可以看看进阶篇:http://www.cnblogs.com/kangoroo/p/7300433.html
设想你遇到如下场景:
1)高并发
2)请求的执行相当消耗机器资源,流量峰值的时候可能超出单机界限
3)请求返回慢,客户长时间等在页面等待任务返回
4)存在耗时的定时任务
这时你就需要一个分布式异步的框架了。
celery会是一个不错的选择。本文将一步一步的介绍如何使用celery和django进行集成,并进行分布式异步编程。
1、安装依赖
默认你已经有了python和pip。我使用的版本是:
python 2.7.10 pip 9.0.1
virtualenv 15.1.0
创建沙盒环境,我们生产过程中通过沙盒环境来使用各种python包的版本,各个应用的沙盒环境之间互不干扰。
$ mkdir kangaroo $ cd kangaroo $ virtualenv kangaroo.env # 沙盒下面有什么,可以看到有python的bin、include和pip $ ll kangaroo.env total 8 drwxrwxr-x 2 data_monitor data_monitor 4096 Aug 7 16:00 bin drwxrwxr-x 2 data_monitor data_monitor 22 Aug 7 16:00 include drwxrwxr-x 3 data_monitor data_monitor 22 Aug 7 16:00 lib -rw-rw-r-- 1 data_monitor data_monitor 60 Aug 7 16:00 pip-selfcheck.json # 让沙盒环境在当前session(shell)中生效 $ source kangaroo.env/bin/activate # 可以看到命令行首多了(kangaroo.env),而且python的路径已经变了 (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ which python /data/home/data_monitor/yangfan/test/kangaroo/kangaroo.env/bin/python
下面我们开始在kangaroo环境下安装相应版本的django和celery,以及django-celery集成包。
(kangaroo.env) [XXX@XXX kangaroo]$ pip install django==1.10.6 (kangaroo.env) [XXX@XXX kangaroo]$ pip install celery==3.1.25 (kangaroo.env) [XXX@XXX kangaroo]$ pip install django-celery==3.2.1
我在安装的时候写明了版本号,是因为这套版本号在我们的生产环境过玩转过。
如果你换了对应的版本号的话,可能会引发冲突,出现意想不到的问题。亲测还是有一些版本之间是有怪问题的。
2、创建工程
创建工程kangaroo:django-admin startproject kangaroo
# 在kangaroo.env同级目录下创建工程kangaroo
cd /home/data_monitor/yangfan/test/kangaroo
# 创建工程kangaroo (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ django-admin startproject kangaroo (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll total 0 drwxrwxr-x 3 data_monitor data_monitor 37 Aug 7 16:45 kangaroo drwxrwxr-x 5 data_monitor data_monitor 65 Aug 7 16:00 kangaroo.env
创建APP foot:python manage.py startapp foot
# 进入工程目录,你会看到manage.py文件 cd kangaroo # 创建APP foot (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ python manage.py startapp foot (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll total 4 drwxrwxr-x 3 data_monitor data_monitor 116 Aug 7 16:48 foot drwxrwxr-x 2 data_monitor data_monitor 108 Aug 7 16:48 kangaroo -rwxrwxr-x 1 data_monitor data_monitor 806 Aug 7 16:45 manage.py # 进入foot app目录,看一下有什么 (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ cd foot/ (kangaroo.env) [data_monitor@bigdata-arch-client10 foot]$ ll total 20 -rw-rw-r-- 1 data_monitor data_monitor 63 Aug 7 16:48 admin.py -rw-rw-r-- 1 data_monitor data_monitor 124 Aug 7 16:48 apps.py -rw-rw-r-- 1 data_monitor data_monitor 0 Aug 7 16:48 __init__.py drwxrwxr-x 2 data_monitor data_monitor 24 Aug 7 16:48 migrations -rw-rw-r-- 1 data_monitor data_monitor 98 Aug 7 16:48 models.py -rw-rw-r-- 1 data_monitor data_monitor 60 Aug 7 16:48 tests.py -rw-rw-r-- 1 data_monitor data_monitor 63 Aug 7 16:48 views.py
至此我们创建了工程kangaroo和app foot,下面我们介绍如何集成celery。
3、django-celery的集成配置
我们这里集成的方式是使用django-celery包。
集成配置要注意以下几个地方就好了,配置起来还是比较简单的。
1)修改kangaroo/settings.py文件
让djcelery模块生效
import os import djcelery djcelery.setup_loader() ... INSTALLED_APPS = ( ... 'djcelery', 'kombu.transport.django', ... )
配置broker和backend
# Celery settings # redis做broker, 第二个":"前后是redis的用户名密码,后面的2是db # BROKER_URL = 'redis://:password@10.93.84.53:6379/2' # rabbitMQ做broker,第二个":"前后是rabbitMQ的用户名密码 BROKER_URL = 'amqp://admin:bigdata123@10.93.21.21:5672//' # Celery的backend记录地址,这里只给出redis的配置 CELERY_RESULT_BACKEND = 'redis://:bigdata123@10.93.84.53:6379/3'
4、第一个task
其实应该在创建app的时候就将appName添加到settings.py的INSTALLED_APPS中,我们没有这样做事留到现在好说明问题。
我们修改settings.py加入foot。
import os import djcelery djcelery.setup_loader() ... INSTALLED_APPS = ( ... 'djcelery', 'kombu.transport.django', ... 'foot', ... )
当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为@task的function, 并将它们注册为celery task。
所以我们在foot包下创建tasks.py文件,并且添加我们的task。
foot/tasks.py
# -*- coding:utf-8 -*- import time import logging from celery import task logger = logging.getLogger(__name__) @task() def foot_task(param_dict): logger.info('foot task start! param_dict:%s' % param_dict) time.sleep(10) logger.info('foot task finished!') return
这个task等待接收一个参数字典,只是简单的打印参数,然后sleep10s就退出了。
让task在后台worker中注册,当有任务分发下来的时候就开始执行。只需执行
python manage.py celery worker --loglevel=info
5、分发任务dispatch
任务触发的两种方式:
1)定时调度,可以适用celery beat,这里没有详述,因为在实际生产中,我们使用了apschedule做了定时器。
2)请求异步执行,这里给出了例子,服务接收http请求,直接返回,任务异步的丢给worker执行。
我这里写了一个通用的函数,这个函数用于分发任务
def dispatch(task, param_dict): param_json = json.dumps(param_dict) try: task.apply_async( [param_json], retry=True, retry_policy={ 'max_retries': 1, 'interval_start': 0, 'interval_step': 0.2, 'interval_max': 0.2, }, ) except Exception, ex: logger.info(traceback.format_exc()) raise
如何触发foot.tasks中的任务呢,只需要
import logging import traceback from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from foot.tasks import foot_task from common.dispatcher import dispatch logger = logging.getLogger(__name__) @csrf_exempt def hello(request): if request.method == 'GET': try: user = request.GET.get('username') dispatch(test_task, {'hello': user}) return JsonResponse({'code': 0, 'msg':'success'}) except Exception, ex: return JsonResponse({'code: -1, 'msg': traceback.format_exc()})
1)当你在客户端发送请求:hello?username='kangaroo'时
2)服务瞬间返回:{'code': 0, 'msg':'success'}
3)后端sleep10秒后执行成功,打印hello:kangaroo
这就是异步的效果。