zoukankan      html  css  js  c++  java
  • 4.异步邮件发送和定时任务

    参考:https://www.cnblogs.com/xiaonq/p/14097376.html

    1、创建tasks.py文件进行验证

    from celery import Celery
    import time
    
    #测试
    
    app = Celery('TASK',
                 broker='redis://localhost',        
                 backend='redis://localhost')
    
    @app.task
    def add(x, y):
       print("running..add.", x, y)
       return x + y
    
    @app.task
    def minus(x, y):
       time.sleep(60)
       print("running..minus.", x, y)
       return x - y
    tasks.py

    1.2启动命令

    celery -A tasks worker --loglevel=info            # tasks是tasks.py文件:必须在tasks.py所在目录下执行

    2.项目结构如下

     2.1 opwf_project/celery_task文件夹

    # -*- coding: utf-8 -*-
    # celery.py
    
    from celery import Celery
    import os,sys
    import django
    
    # 1.添加django项目根路径
    CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
    sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
    
    # 2.添加django环境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
    django.setup() # 读取配置
    
    
    # 3.celery基本配置
    app = Celery('proj',
                 broker='redis://localhost:6379/14',
                 backend='redis://localhost:6379/15',
                 include=['celery_task.tasks',
    
                          ])
    
    # 4.实例化时可以添加下面这个属性
    app.conf.update(
       result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
    )
    
    # 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
    app.conf.beat_schedule = {
        'add-every-5-seconds': {
            'task': 'celery_task.tasks.test_task_crontab',
            'schedule': 5.0,
            'args': (16, 16)
        },
    }
    
    # 6.添加时区配置
    app.conf.timezone = 'Asia/Shanghai'
    print(dir(app))
    if __name__ == '__main__':
       app.start()
    celery.py
    from .celery import app
    import os,sys
    from .celery import CELERY_BASE_DIR
    
    
    '''测试定时任务'''
    @app.task()
    def test_task_crontab(x,y):
        '''添加django项目路径'''
        sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../loonflow'))
        from utils.rl_sms import  test_crontab
        res = test_crontab(x,y)
        return x + y
    
    
    
    
    @app.task(bind=True)
    def send_sms_code(self, mobile, datas):
        sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../loonflow'))
        # 在方法中导包
        from utils.rl_sms import send_message
    
        # time.sleep(5)
        try:
            # 用 res 接收发送结果, 成功是:0, 失败是:-1
            res = send_message(mobile, datas)
        except Exception as e:
            res = '-1'
    
        if res == '-1':
            # 如果发送结果是 -1  就重试.
            self.retry(countdown=5, max_retries=3, exc=Exception('短信发送失败'))
    tasks.py

    3.1utils.py/rl_sms.py文件夹

    from ronglian_sms_sdk import SmsSDK
    from user.models import User
    accId = '8aaf070875774c6d01758d48effb0a36'
    accToken = '10ccc33de038428d864cb5eb122649f6'
    appId = '8aaf070875774c6d01758d48f0cd0a3d'
    
    
    def send_message(mobile, datas):
        user = User.objects.all()[0]
        print(user.username, '%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
        sdk = SmsSDK(accId, accToken, appId)
        tid = '1'  # 测试模板id为: 1. 内容为: 【云通讯】您的验证码是{1},请于{2}分钟内正确输入。
        # mobile = '13303479527'
        # datas = ('666777', '5')  # 模板中的参数按照位置传递
        resp = sdk.sendMessage(tid, mobile, datas)
        print("##########################################")
        print('执行了这个方法 send_message')
        return resp
    
    
    def test_crontab(x,y):
        print('############### 执行test_crontab测试任务 #############')
        print('############### 邮件审批超时提醒 #############')
    rl_sms.p

    4.命令

    ### 1.1 进入执行目录
    cd opwf_project
    
    ### 1.2 celery管理
    celery -A celery_task worker -l INFO               # 单线程
    celery multi start w1 w2 -A celery_pro -l info     #一次性启动w1,w2两个worker
    celery -A celery_pro status                        #查看当前有哪些worker在运行
    celery multi stop w1 w2 -A celery_pro              #停止w1,w2两个worker
    
    # 1.项目中启动celery worker
    celery  multi start celery_task -A celery_task -l debug --autoscale=50,10      # celery并发数:最多50个,最少5个
    # 2.在项目中关闭celery worker
    ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9      # 关闭所有celery进程
    ```
    
    ### 1.3 django_celery_beat管理
    # 1.普通测试启动celery beat
    celery -A celery_task beat -l info
    # 2.在项目中后台启动celery beat
    celery -A celery_task beat -l debug >> /aaa/Scheduler.log 2>&1 & 
    # 3.停止celery beat
    ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' |
  • 相关阅读:
    ZOJ 3332 Strange Country II
    ZOJ 3331 Process the Tasks(双塔DP)
    ZOJ 3326 An Awful Problem(模拟)
    HDU 1796 How many integers can you find(容斥原理)
    HDU 4059 The Boss on Mars(容斥原理)
    HDU 4135 Co-prime(容斥原理)
    HDU 5677 ztr loves substring(回文串加多重背包)
    CodeForces 668B Little Artem and Dance
    CodeForces 667A Pouring Rain
    Java实现 LeetCode 764 最大加号标志(暴力递推)
  • 原文地址:https://www.cnblogs.com/shensy/p/14103495.html
Copyright © 2011-2022 走看看