zoukankan      html  css  js  c++  java
  • celery 异步发送短信验证码、延迟任务

    短信

    ## celery.py
    import os, django
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
    django.setup()
    
    
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/15'
    backend = 'redis://127.0.0.1:6379/15'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 自动任务的定时配置
    from celery.schedules import crontab
    from datetime import timedelta
    app.conf.beat_schedule = {
        # 定时任务:任务名自定义
        # 'update_banner_cache': {
        #     'task': 'celery_task.tasks.update_banner_cache',  # 任务源
        #     'args': (),  # 任务参数
        #     'schedule': timedelta(seconds=8), # 定时添加任务的时间
        #     # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        # },
        'send_msg': {
            'task': 'celery_task.tasks.send_msg',
            'args': (),
            'schedule': timedelta(seconds=10),
        }
    }
    
    

    tasks.py

    from .celery import app
    
    from home.models import Banner
    from django.conf import settings
    from django.core.cache import cache
    from home.serializers import BannerModelSerializer
    
    @app.task
    
    def update_banner_cache():
        banner_query = Banner.objects.filter(is_delete = False, is_show=True).order_by('-order')[:settings.BANNER_COUNT]
        banner_list = BannerModelSerializer(banner_query, many=True).data
        for banner_dic in banner_list:
            banner_dic['image'] = settings.BASE_URL + banner_dic['image']
        cache.set('banner_list', banner_list)
    
    
    #异步定时发送手机验证码
    from libs.tx_sms import sms
    @app.task
    def send_msg():
        code = sms.get_code()
        mobile = '13322332233'
        result = sms.send_msg(mobile, code)
        print(result)
    

    然后Terminal开两个窗口,回到根目录,执行, celery worker -A celery_task -l info -P eventlet 和 celery beat -A celery_task -l info 则10s会发一次验证码

    延迟任务

    ## celery.py
    from celery import Celery
    
    # broker:任务仓库
    broker = 'redis://127.0.0.1:6379/15'
    # backend:任务结果仓库
    backend = 'redis://127.0.0.1:6379/15'
    # include:任务(函数)所在文件
    app = Celery(broker=broker, backend=backend, include=['celery_package.tasks'])
    
    
    

    task.py

    from .celery import app
    
    @app.task
    def jump(n1, n2):
        res = n1 * n2
        print('n1 * n2 = %s' % res)
        return res
    
    

    add_task.py

    
    from celery_package.tasks import jump
    
    # 直接执行函数
    # jump(10, 20)
    
    # 添加celery立即任务
    # jump.delay(10, 20)
    
    # 添加延迟任务
    # args是jump任务需要的参数,没有就设置为空()
    # eta是该任务执行的UTC格式的时间
    from datetime import datetime, timedelta
    def eta_second(second):
        ctime = datetime.now()
        utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
        time_delay = timedelta(seconds=second)
        return utc_ctime + time_delay
    def eta_days(days):
        ctime = datetime.now()
        utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
        time_delay = timedelta(days=days)
        return utc_ctime + time_delay
    
    # apply_async就是添加延迟任务
    jump.apply_async(args=(200, 50), eta=eta_second(10))
    

    **右键运行add_tasks. 然后再Terminal窗口执行celery worker -A celery_package -l info -P eventlet 则过10s后,才会产生运算结果 **

  • 相关阅读:
    基于ARP的网络扫描工具netdiscover
    渗透测试集成环境Faraday
    NBNS扫描工具nbtscan-unixwiz
    分享Kali Linux 2017年第18周镜像文件
    Hat's Fibonacci
    N!
    A + B Problem II(大数加法)
    产生冠军(拓扑排序)
    确定比赛名次
    Legal or Not(模板题)
  • 原文地址:https://www.cnblogs.com/michealjy/p/11979233.html
Copyright © 2011-2022 走看看