基本步骤
安装
install redis
yum install -y http://rpms.famillecollet.com/enterprise/remi-release-7.rpm
yum --enablerepo=remi install redis
service redis start
chkconfig redis on
pip3 install celery
pip3 install celery[redis]
启动celery worker
manage.py文件同级目录
celery -A dj_test worker -l info
项目代码,项目名称 dj_test
代码地址 https://github.com/infaaf/dj_test_ops.git tag : v1_withcelery
dj_test/dj_test/celery.py
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/usr/bin/env python # -*- coding=utf-8 -*- #__author__ = 'infaaf' # refer https://github.com/celery/celery/tree/master/examples/django/ from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dj_test.settings') # change proj name here app = Celery('dj_test') # app = Celery('dj_test', backend='redis://:pwd@127.0.0.1:6379/0', broker='redis://:pwd@127.0.0.1:6379/0') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
dj_test/dj_test/__init__.py
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from __future__ import absolute_import, unicode_literals import pymysql pymysql.install_as_MySQLdb() # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
dj_test/dj_test/settings.py
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
额外增加如下 ### celery CELERY_BROKER_URL = 'redis://:pwd@127.0.0.1:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://:pwd@127.0.0.1:6379/0' CELERY_TASK_SERIALIZER = 'json'
dj_test/dj_test/app01/tasks.py
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/usr/bin/env python # -*- coding=utf-8 -*- # __author__ = 'infaaf' from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y
dj_test/dj_test/app01/views.py # 取返回值需要在setting定义 celery_result_backend
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from django.shortcuts import render,HttpResponse from .utils.simple_allinone_ansible_runner import Runner as AnsibleRunner from .tasks import add # Create your views here. import logging logger=logging.getLogger(__name__) def index(request): r=add.delay(3,4) print(r.ready()) import time time.sleep(1) print(r.ready()) print(r.get(timeout=3)) return HttpResponse('123')
日志信息
django日志
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
System check identified no issues (0 silenced). August 26, 2018 - 15:12:18 Django version 2.1, using settings 'dj_test.settings' Starting development server at http://192.168.188.200:8100/ Quit the server with CONTROL-C. False 调用瞬间并未完成 20180826 15:12:21 [INFO] basehttp/basehttp.py[line:124] "GET / HTTP/1.1" 200 3 True 7
celery 日志
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
[2018-08-26 15:12:20,523: INFO/MainProcess] Received task: app01.tasks.add[be1ae35e-d104-4015-a0fa-c4316221611d] [2018-08-26 15:12:20,526: INFO/ForkPoolWorker-4] Task app01.tasks.add[be1ae35e-d104-4015-a0fa-c4316221611d] succeeded in 0.0008414150070166215s: 7
redis 存储
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
127.0.0.1:6379> keys * 1) "_kombu.binding.celery.pidbox" 2) "celery-task-meta-8f64c6a8-7b65-42df-b556-819991417aeb" 3) "a" 4) "_kombu.binding.celeryev" 5) "celery-task-meta-5f6d5a5e-8782-4535-bc24-48d109939634" 6) "celery-task-meta-55e1a767-38a0-4222-b173-f9ff4f4f4e05" 7) "celery-task-meta-6dcc4017-6287-4f53-9ffc-51cb11073bba" 8) "celery-task-meta-cd34ade8-5afc-4c6b-814e-3eb65c72d25a" 9) "celery-task-meta-d8e8c21d-5df2-4141-b8ee-250fd3302301" 10) "celery-task-meta-be1ae35e-d104-4015-a0fa-c4316221611d" 11) "_kombu.binding.celery" 12) "unacked_mutex" 127.0.0.1:6379> 127.0.0.1:6379> get celery-task-meta-be1ae35e-d104-4015-a0fa-c4316221611d "{"status": "SUCCESS", "result": 7, "traceback": null, "children": [], "task_id": "be1ae35e-d104-4015-a0fa-c4316221611d"}" 127.0.0.1:6379>
celery ansible
celery multi start w1 -A dj_test -l info --logfile=celerylog.log --pidfile=celerypid.pid
pkill -9 -f 'celery worker'
tasks文件案例
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
@shared_task() def ansible_test(host_data,tasks): current_process()._config = {'semprefix': '/mp'} inventory = BaseInventory(host_data) runner = AdHocRunner(inventory) ret = runner.run(tasks, "all") print(ret.results_summary) print(ret.results_raw) return 'a'
额外选项
结果记录到django db
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
pip3 install django-celery-results INSTALLED_APPS = ( ..., 'django_celery_results', ) CELERY_RESULT_BACKEND = 'django-db' #CELERY_RESULT_BACKEND = 'django-cache' $ python manage.py migrate django_celery_results