一、目录结构
二、创建worker文件夹
__init__.py
# -*- coding:utf-8 -*-
import os
from celery import Celery, platforms
# elery不能root用户启动解决(C_FORCE_ROOT environment)
platforms.C_FORCE_ROOT = True
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "upload_pro.settings")
# 创建Celery Application
celery_app = Celery('upload_pro')
celery_app.config_from_object('worker.config')
celery_app.autodiscover_tasks()
def call_by_worker(func):
""" 将任务在 Celery 中异步执行 """
task = celery_app.task(func)
return task.delay
config.py
# -*- coding:utf-8 -*-
# 将此文件celery启动 windows celery worker -A worker --loglevel=info -P gevent
# 将此文件celery启动 linux celery worker -A worker --loglevel=info
# 本地测试
# broker_url = 'redis://192.168.10.10:6379/8'
broker_url = 'redis://:root1234@A@192.168.8.191:7007/11'
broker_pool_limit = 1000 # Broker 连接池 默认为10
timezone = 'Asia/Shanghai'
accept_content = ['pickle', 'json']
task_serializer = 'pickle'
result_expires = 3600 # 任务过期时间
# 本地测试
# result_backend = 'redis://192.168.10.10:6379/8'
result_backend = 'redis://:root1234@A@192.168.8.191:7007/11'
result_serializer = 'pickle'
result_cache_max = 1000 # 任务结果最大缓存数量
worker_redirect_stdouts_level = 'INFO'
三、启动
celery worker -A worker --loglevel=info -P gevent
from worker import call_by_worker # celery worker -A worker --loglevel=info -P gevent # @call_by_worker def add(): time.sleep(10) return 10 @csrf_exempt def get_celery(request): if request.method == "GET": # 'application/x-zip-compressed'表示返回的文件格式是zip,根据需要替换 res = add() response = HttpResponse(js_data, 'text/javascript') return response return JsonResponse({"code": 1001})