celery + django:
异步任务:
#task.py:
from celery import Celery
# broker
broker = 'redis://127.0.0.1:6379/0'
# backend
backend = 'redis://127.0.0.1:6379/1'
# worker
app = Celery(broker=broker, backend=backend)
# tasks
# 任务就是一个功能函数,执行任务就是执行函数,任务的结果就是函数的返回值
@app.task
def add(a, b):
res = a + b
print('a + b = %s' % res)
return res
# 手动添加脚本文件:
from tasks import add
result = add.delay(x,y) # 立即执行的异步任务
print(reult.id)
# get_result.py:
from tasks import app
from celery.result import AsyncResult
id = 'ad4ca85d-998a-4cc9-9d24-655986a169b3'
# async = AsyncResult(id=id, app=app)
# res = async.get()
# print(res)
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
延时任务:
# add_task.py
from celery_task.tasks import add
# 添加延迟任务
from datetime import datetime, timedelta
result = add.apply_async(args=(10, 20), eta=datetime.utcnow() + timedelta(seconds=10))
print(result)
#get_reule.py
from celery_task.celery import app
from celery.result import AsyncResult
id = 'f9d97749-7e48-4b56-a31c-69fe73f9c1a7' #redis库的id
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
django-celery:
基本使用
django中使用celery有两种方式:
上面建立特定目录结构的是一种,
另一种django-celery模块,对于django版本有严格的要求,要是项目换了环境,就无法使用
安装需要的版本:
celery==3.1.25
django-celery==3.1.20
celery_config.py:
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30
task.py:
from celery import task
@task
def add(a,b):
with open('a.text', 'a', encoding='utf-8') as f:
f.write('a')
print(a+b)
views.py:
from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
# result=add.delay(2,3)
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=5)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
return HttpResponse('ok')
settings.py:
INSTALLED_APPS = [
...
'djcelery',
'app01'
]
...
from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'