django-celery异步任务设置过期时间
场景
在django做项目的时候,因为一些特殊的场景,所以需要用到异步操作,比如发短信,发邮件。设置了django-celery,通过redis作为中间件存储。有一次redis意外死亡了,过了很久才有人提出来,说登录短信接收不到,看了日志发现了问题,重启了redis以后,手机收到了一堆的短信轰炸
分析
因为django-celery的生产消费者模型,待消费的任务队列,没有过期时间,所以复活的redis,将未执行的任务全部执行了。
于是我在本地检测了一下,关掉celery以后,运行redis,执行异步操作,会在redis中你设置的数据库中生成一个名为celery的列表(keys *),看了一下里面的内容(lrange celery 0 -1),内容结构如下:
b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "shadow": null, "eta": null, "expires": "2021-05-19T17:12:04.087801+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"}}',
b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "shadow": null, "eta": null, "expires": "2021-05-19T17:11:55.480755+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "34dc18c5-b960-4b29-80d6-126e76e3d161"}}'
单独查看一条数据(lindex celery 0)
{
"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {
"lang": "py",
"task": "permission_changed",
"id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
"shadow": null,
"eta": null,
"expires": null,
"group": null,
"group_index": null,
"retries": 0,
"timelimit": [null, null],
"root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
"parent_id": null,
"argsrepr": "(5, 1)",
"kwargsrepr": "{}",
"origin": "gen66816@wjh-MacBook-Pro.local"
},
"properties": {
"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
"reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8",
"delivery_mode": 2,
"delivery_info": {
"exchange": "",
"routing_key": "celery"
},
"priority": 0,
"body_encoding": "base64",
"delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"
}
}
我发现里面有一个属性expires
,这不就是设置过期时间的吗。
于是我开始看celery的源码,哪里能设置这个expires
属性,切入点在.delay()方法这里,
def delay(self, *args, **kwargs):
return self.apply_async(args, kwargs)
再看apply_async
方法,因为没有看到expires
关键字参数,所以我猜他在options里面,于是找了一下options的取值
看到这么一段
preopts = self._get_exec_options()
options = dict(preopts, **options) if options else preopts
再看_get_exec_options
方法
def _get_exec_options(self):
if self._exec_options is None:
self._exec_options = extract_exec_options(self)
return self._exec_options
果然,在extract_exec_options中找到了redis中存储的属性的关键字由来
extract_exec_options = mattrgetter(
'queue', 'routing_key', 'exchange', 'priority', 'expires',
'serializer', 'delivery_mode', 'compression', 'time_limit',
'soft_time_limit', 'immediate', 'mandatory', # imm+man is deprecated
)
因为preopts = self._get_exec_options()
的self,本身类是class Task:
所以,只要在你定义的task任务中的task装饰器上传入expires属性,就可以了
至于应该传什么样的值,在源码中找到这么一段(挺不好找的,amqp.py/AMQP构造方法下的task_protocols属性)
if isinstance(expires, numbers.Real):
self._verify_seconds(expires, 'expires')
now = now or self.app.now()
timezone = timezone or self.app.timezone
expires = maybe_make_aware(
now + timedelta(seconds=expires), tz=timezone,
)
很明显,seconds是秒,所以最后expires属性在redis中存储的是一个准确的时间格式,这里有一个细节
时间的存储涉及到了时区,celery的当前时间取的是self.app.now(),于是乎
def now(self):
"""Return the current time and date as a datetime."""
now_in_utc = to_utc(datetime.utcnow())
return now_in_utc.astimezone(self.timezone)
WTF?竟然默认用的是UTC,所以我选择先加上expires属性给个5分钟看看会不会生效以及会不会差8个小时
果然!生效了,果然,差了8个小时
怎么办呢?我选择重写celery
的now
方法
只设置now,不管timezone真的可以吗?答案是可以的,看一下maybe_make_aware方法
def maybe_make_aware(dt, tz=None):
"""Convert dt to aware datetime, do nothing if dt is already aware."""
if is_naive(dt):
dt = to_utc(dt)
return localize(
dt, timezone.utc if tz is None else timezone.tz_or_local(tz),
)
return dt
dt, timezone.utc if tz is None else timezone.tz_or_local(tz)
,这里如果时间有时区,就用自己的
于是乎
实现
django-celery的项目结构,网上一抓一大把,我就不写了,先看main文件
import os
from celery import Celery
from django.utils import timezone
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SHBMCRM.settings')
class MyCelery(Celery):
def now(self):
return timezone.localtime() # 使用django自带的时间,会根据settings中设置的TIME_ZONE获取当前时间,很方便
celery_app = MyCelery('SHBMCRM')
celery_app.config_from_object('celery_tasks.config')
celery_app.autodiscover_tasks(['celery_tasks.sms', 'celery_tasks.message'])
# 设置过期时间 60s * 5 = 5分钟
@celery_app.task(name='new_member', expires=60*5)
def new_member(mobile, user):
# do somethings
效果
{
"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {
"eta": null,
"expires": "2021-05-20T10:26:41.945686+08:00", // 有过期时间了
"group": null,
"kwargsrepr": "{}",
"origin": "gen66816@wjh-MacBook-Pro.local"
},
}
第一条就是过期了
[2021-05-20 10:21:42,017: INFO/MainProcess] Received task: permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094] expires:[2021-05-20 10:26:41.945686+08:00]
第二条执行成功
[2021-05-20 10:21:42,036: INFO/ForkPoolWorker-1] Task permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094] succeeded in 0.01773979200000042s: None