1 写一个py文件
import celery
# 消息中间件(redis)
broker='redis://127.0.0.1:6379/1' # 1 表示使用redis 1 这个db
# 结果存储(redis)
backend='redis://127.0.0.1:6379/2' # 2 表示使用redis 2 这个db
# 实例化得到对象,指定中间件和结果存储
app=celery.Celery('test',broker=broker,backend=backend)
2 celery多任务结构
package_celery: # 项目名
celery_task # celery包名
__init__.py
celery.py # celery 的app,必须叫celery
order_task.py # 任务
user_task.py # 任务
result.py # 结果查询
submit_tast.py # 提交任务
# 运行worker(在package_celery目录下执行)
celery worker -A celery_task -l info -P eventlet
# 提交任务
from celery_task import order_task,user_task
# 提交一个给用户发短信的任务
res=user_task.send_sms.delay('18723454566')
print(res)
# 提交一个取消订单任务
res=order_task.cancel_order.delay()
print(res)
# 真实应用场景
-秒杀系统
-不能秒超,使用锁(mysql悲观锁,乐观锁),redis锁
-提高并发量---》把同步做成异步---》使用celery
-前端点击秒杀按钮,向后端发送秒杀请求---》同步操作
-同步操作
-请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状 态
-请求返回,告诉前端,秒杀成功
-异步操作
-请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如 果够,要生成订单(mysql)
-秒杀是否成功的结果还没有,直接返回了(返回任务id)
-前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完 成(带着任务id查)
-如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行
-又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功
# celery执行延时任务
## 第一种方式:2021年1月7日17点3分12秒发送短信
# from datetime import datetime
# # # eta:延迟多长时间执行,eta需要传时间对象,并且是utc时间
# v1 = datetime(2021, 1, 7, 17, 3, 12)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# res=user_task.send_sms.apply_async(args=['18977654332',],eta=v2)
## 第二种方式:隔几秒后执行
from datetime import datetime
from datetime import timedelta
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
print(task_time)
res=user_task.send_sms.apply_async(args=['18977654332',],eta=task_time)
4 高级使用之定时任务
# 在celery.py中配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-msg':{
'task': 'celery_task.user_task.send_sms',
# 'schedule': timedelta(hours=24*10),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'schedule': crontab(hour=8, day_of_month=1), # 每月一号早八点
'args': ('18964352112',),
}
}
# 启动beat,负责每隔3s提交一个任务
celery beat -A celery_task -l info
# 启动worker
celery worker -A celery_task -l info -P eventlet
5 django中使用celery
1 celery是独立的,跟框架没有关系
2 django-celery第三方模块,兼容性不好,咱们不采用,咱们使用通用方式
3 目录
celery_task
__init__.py
celery.py
home_task.py
order_task.py
user_task.py
luffyapi
5.1 home_task.py
from celery_task.celery import app
@app.task
def update_banner():
from django.core.cache import cache
from django.conf import settings
from home import models
from home import serializer
banners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]
ser = serializer.BannerModelSerializer(instance=banners,many=True)
banner_data=ser.data
# 拿不到request对象,所以头像的连接base_url要自己组装
for banner in banner_data:
banner['img'] = 'http://127.0.0.1:8000%s' % banner['img']
cache.set('banner_data',banner_data)
return True
5.2 celery.py
import celery
import os
# 执行django配置文件,环境变量加入
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
broker='redis://127.0.0.1:6379/1' # 1 表示使用redis 1 这个db
backend='redis://127.0.0.1:6379/2' # 2 表示使用redis 2 这个db
app=celery.Celery('test',broker=broker,backend=backend,
include=['celery_task.order_task','celery_task.user_task','celery_task.home_task']
)
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
# 'send-msg':{
# 'task': 'celery_task.user_task.send_sms',
# # 'schedule': timedelta(hours=24*10),
# # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
# 'schedule': crontab(hour=8, day_of_month=1), # 每月一号早八点
# 'args': ('18964352112',),
# }
'update-banner':{
'task': 'celery_task.home_task.update_banner',
'schedule': timedelta(seconds=10),
'args': (),
}
}
views.py
from celery_task import user_task
from celery_task.celery import app
from celery.result import AsyncResult
def test_celery(request):
res_id = request.GET.get('id')
if res_id:
res = AsyncResult(id=res_id, app=app)
if res.successful():
result = res.get()
print(result)
return HttpResponse('执行完成了,结果是:%s' % result)
res = user_task.send_sms.delay('18276345221')
return HttpResponse('任务号是:%s' % str(res))
6 首页轮播图定时更新
1 把首页轮播图接口改成,先去缓存中取,缓存中没有,再去数据库查
2 首页轮播图加入了缓存
3 以后,如果你的接口,请求慢,第一反应就是先使用缓存
4 使用celery定时更新缓存
class BannerView(GenericViewSet, ListModelMixin):
queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]
serializer_class = serializer.BannerModelSerializer
# 改成,先从缓存中取,缓存中如果有,直接返回,没有,再去数据库查
def list(self, request, *args, **kwargs):
# 如果缓存中有值,直接取出来返回,速度很快
banner_data = cache.get('banner_data')
if banner_data:
print('走了缓存')
return Response(data=banner_data)
# 如果缓存中没有,再走数据,查出来,放到缓存中
res = super().list(request, *args, **kwargs)
# 把首页轮播图数据放到缓存中
cache.set('banner_data', res.data)
print('没走缓存')
return res