zoukankan      html  css  js  c++  java
  • Django——django-celery异步任务设置过期时间

    django-celery异步任务设置过期时间

    场景

    在django做项目的时候,因为一些特殊的场景,所以需要用到异步操作,比如发短信,发邮件。设置了django-celery,通过redis作为中间件存储。有一次redis意外死亡了,过了很久才有人提出来,说登录短信接收不到,看了日志发现了问题,重启了redis以后,手机收到了一堆的短信轰炸

    分析

    因为django-celery的生产消费者模型,待消费的任务队列,没有过期时间,所以复活的redis,将未执行的任务全部执行了。

    于是我在本地检测了一下,关掉celery以后,运行redis,执行异步操作,会在redis中你设置的数据库中生成一个名为celery的列表(keys *),看了一下里面的内容(lrange celery 0 -1),内容结构如下:

    b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "shadow": null, "eta": null, "expires": "2021-05-19T17:12:04.087801+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"}}',
     b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "shadow": null, "eta": null, "expires": "2021-05-19T17:11:55.480755+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "34dc18c5-b960-4b29-80d6-126e76e3d161"}}'
    

    单独查看一条数据(lindex celery 0)

    {
    	"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
    	"content-encoding": "utf-8",
    	"content-type": "application/json",
    	"headers": {
    		"lang": "py",
    		"task": "permission_changed",
    		"id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
    		"shadow": null,
    		"eta": null,
    		"expires": null,
    		"group": null,
    		"group_index": null,
    		"retries": 0,
    		"timelimit": [null, null],
    		"root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
    		"parent_id": null,
    		"argsrepr": "(5, 1)",
    		"kwargsrepr": "{}",
    		"origin": "gen66816@wjh-MacBook-Pro.local"
    	},
    	"properties": {
    		"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
    		"reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8",
    		"delivery_mode": 2,
    		"delivery_info": {
    			"exchange": "",
    			"routing_key": "celery"
    		},
    		"priority": 0,
    		"body_encoding": "base64",
    		"delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"
    	}
    }
    

    我发现里面有一个属性expires ,这不就是设置过期时间的吗。
    于是我开始看celery的源码,哪里能设置这个expires属性,切入点在.delay()方法这里,

        def delay(self, *args, **kwargs):
            return self.apply_async(args, kwargs)
    

    再看apply_async方法,因为没有看到expires关键字参数,所以我猜他在options里面,于是找了一下options的取值
    看到这么一段

    preopts = self._get_exec_options()
    options = dict(preopts, **options) if options else preopts
    

    再看_get_exec_options方法

        def _get_exec_options(self):
            if self._exec_options is None:
                self._exec_options = extract_exec_options(self)
            return self._exec_options
    

    果然,在extract_exec_options中找到了redis中存储的属性的关键字由来

    extract_exec_options = mattrgetter(
        'queue', 'routing_key', 'exchange', 'priority', 'expires',
        'serializer', 'delivery_mode', 'compression', 'time_limit',
        'soft_time_limit', 'immediate', 'mandatory',  # imm+man is deprecated
    )
    

    因为preopts = self._get_exec_options()的self,本身类是class Task:
    所以,只要在你定义的task任务中的task装饰器上传入expires属性,就可以了
    至于应该传什么样的值,在源码中找到这么一段(挺不好找的,amqp.py/AMQP构造方法下的task_protocols属性)

            if isinstance(expires, numbers.Real):
                self._verify_seconds(expires, 'expires')
                now = now or self.app.now()
                timezone = timezone or self.app.timezone
                expires = maybe_make_aware(
                    now + timedelta(seconds=expires), tz=timezone,
                )
    

    很明显,seconds是秒,所以最后expires属性在redis中存储的是一个准确的时间格式,这里有一个细节
    又是一个小细节
    时间的存储涉及到了时区,celery的当前时间取的是self.app.now(),于是乎

            def now(self):
            """Return the current time and date as a datetime."""
            now_in_utc = to_utc(datetime.utcnow())
            return now_in_utc.astimezone(self.timezone)
    

    WTF?竟然默认用的是UTC,所以我选择先加上expires属性给个5分钟看看会不会生效以及会不会差8个小时
    果然!生效了,果然,差了8个小时
    又是一个小细节
    怎么办呢?我选择重写celerynow方法
    只设置now,不管timezone真的可以吗?答案是可以的,看一下maybe_make_aware方法

    def maybe_make_aware(dt, tz=None):
        """Convert dt to aware datetime, do nothing if dt is already aware."""
        if is_naive(dt):
            dt = to_utc(dt)
            return localize(
                dt, timezone.utc if tz is None else timezone.tz_or_local(tz),
            )
        return dt
    

    dt, timezone.utc if tz is None else timezone.tz_or_local(tz),这里如果时间有时区,就用自己的
    于是乎

    实现

    django-celery的项目结构,网上一抓一大把,我就不写了,先看main文件

    import os
    from celery import Celery
    
    from django.utils import timezone
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SHBMCRM.settings')
    
    
    class MyCelery(Celery):
    
        def now(self):
            return timezone.localtime()  # 使用django自带的时间,会根据settings中设置的TIME_ZONE获取当前时间,很方便
    
    
    celery_app = MyCelery('SHBMCRM')
    celery_app.config_from_object('celery_tasks.config')
    celery_app.autodiscover_tasks(['celery_tasks.sms', 'celery_tasks.message'])
    
    # 设置过期时间 60s * 5 = 5分钟
    @celery_app.task(name='new_member', expires=60*5)  
    def new_member(mobile, user):
        # do somethings
    

    效果

    {
    	"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
    	"content-encoding": "utf-8",
    	"content-type": "application/json",
    	"headers": {
    		"eta": null,
    		"expires": "2021-05-20T10:26:41.945686+08:00",   // 有过期时间了
    		"group": null,
    		"kwargsrepr": "{}",
    		"origin": "gen66816@wjh-MacBook-Pro.local"
    	},
    }
    
    第一条就是过期了
    [2021-05-20 10:21:42,017: INFO/MainProcess] Received task: permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094]   expires:[2021-05-20 10:26:41.945686+08:00]
    第二条执行成功
    [2021-05-20 10:21:42,036: INFO/ForkPoolWorker-1] Task permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094] succeeded in 0.01773979200000042s: None
    

    创作不易,转载请注明出处及附带链接

  • 相关阅读:
    莫队专题
    AJAX XML 实例
    AJAX 简介
    AJAX 服务器响应
    AJAX 创建XMLHttpRequest 对象
    AJAX 教程
    AJAX 向服务器发送请求
    AJAX onreadystatechange 事件
    AJAX ASP/PHP 请求实例
    让卖场的死角“起死回生”
  • 原文地址:https://www.cnblogs.com/pywjh/p/14793140.html
Copyright © 2011-2022 走看看