zoukankan      html  css  js  c++  java
  • Celery动态添加定时任务

    背景

    业务需求:用户可创建多个多人任务,需要在任务截止时间前一天提醒所有参与者

    技术选型:

    Celery:分布式任务队列。实现异步与定时

    django-celery-beat:实现动态添加定时任务,即在创建多人任务时添加定时。django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度

    安装与配置

    安装

    pip install celery
    pip install django-celery-beat
    

    配置

    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )
    
    # settings.py
    TIME_ZONE = 'Asia/Shanghai'
    USE_TZ = False
    
    # =================Celery 配置=================
    # 使用redis作为broker
    REDIS_HOST = 'redis://127.0.0.1:6379/0'
    # 关闭 UTC
    CELERY_ENABLE_UTC = False
    # 设置 django-celery-beat 真正使用的时区
    CELERY_TIMEZONE = TIME_ZONE
    # 使用 timezone naive 模式,不存储时区信息,只存储经过时区转换后的时间
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    # 配置 celery 定时任务使用的调度器,使用django_celery_beat插件用来动态配置任务
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
    

    创建django-celery-beat所需要的数据表

    python manage.py migrate
    

    创建celery实例,并定义任务

    # 由于django_celery_beat用到了Django的ORM,因此首先需要setup django,否则会报错
    import os
    import django
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "qaboard.settings")
    django.setup()
    
    from celery import Celery
    from project import settings
    from utils.send_msg import send_msg
    
    # 使用redis作为消息队列,backend也默认为broker使用的队列服务
    app = Celery('test', broker=settings.REDIS_HOST)
    # 载入django配置文件中以 CELERY 开头的配置
    app.config_from_object('project.settings', namespace='CELERY')
    
    
    @app.task
    def test_task():
        send_msg("test celery")
    

    启动celery worker和celery beat

    celery -A project_celery worker --pool=solo -l info -f logs/celery.log
    

    '-A' 是一个全局配置,定义了APP的位置

    '--pool' 是POOL的配置,默认是prefork(并发),选择solo之后,发送的任务不会被并发执行,在worker执行任务过程中,再次发送给worker的任务会排队,执行完一个再执行另一个。不需要并发时可以选择此模式以节约服务器资源

    '-l' 定义了log级别

    '-f' 定义日志文件路径

    celery -A project_celery beat -l info -f logs/beat.log --pidfile=logs/celerybeat.pid
    

    '--pidfile' 用于定位pidfile,pidfile是一个存储了beat进程的进程id的文件,如果此文件存在且此文件中的进程正在运行中,则不会启动新的beat进程

    由于配置中已经声明了调度器,因此这里不需要重新声明,否则需要使用

    --scheduler django_celery_beat.schedulers:DatabaseScheduler
    

    声明使用DatabaseScheduler

    在linux上可以用-B参数同步启动celery beat

    celery -A qaboard_celery worker --pool=solo -l info -f logs/celery.log -B
    

    beat的log会输出到celery.log中

    动态添加定时任务

    PeriodicTask

    此模型定义要运行的单个周期性任务。

    1. 必须为任务指定一种Schedule,即clocked, interval, crontab, solar四个字段必须填写一个,且只能填写一个
    2. name字段给任务命名,它是unique的
    3. task字段指定运行的Celery任务,如“proj.tasks.test_task”
    4. one_off:默认值为False,如果one_off=True,任务被运行一次后enabled字段将被置为False,即任务只会运行一次
    5. args:传递给任务的参数,是一个json字符串,如 ["arg1", "arg2"]
    6. expires:过期时间,过期的任务将不再会被驱动触发

    使用ClockedSchedule

    会在特定的时间触发任务

    def test_clock():
        clock = ClockedSchedule.objects.create(clocked_time=datetime.now() + timedelta(seconds=10))
        PeriodicTask.objects.create(
            name="%s" % str(datetime.now()),
            task="project_celery.celery_app.test_task",
            clocked=clock,
            # 如果使用ClockedSchedule,则one_off必须为True
            one_off=True
        )
    

    不知道为什么我的任务就是无法通过clock触发,beat.log中有DatabaseScheduler: Schedule changed.的记录,但是到了clock指定的时间任务不会被触发,其他的调度器都是可以正常运行的,如果有知道解决方法的同学可以评论告诉我,感谢

    使用IntervalSchedule

    以特定间隔运行的Schedule

    用IntervalSchedule能够实现与ClockedSchedule同样的功能:计算目标时间与当前时间的时间差,令此时间差作为IntervalSchedule的周期,并且将任务的one_off参数置为True

    def time_diff(target_time):
        diff = target_time - datetime.now()
        return int(diff.total_seconds())
    
    def test_interval():
        seconds = time_diff(datetime.strptime("2020-3-19 15:39:00", "%Y-%m-%d %H:%M:%S"))
        schedule = IntervalSchedule.objects.create(every=seconds, period=IntervalSchedule.SECONDS)
        PeriodicTask.objects.create(
            name="%s" % str(datetime.now()),
            task="project_celery.celery_app.test_task",
            interval=schedule,
            one_off=True
        )
    

    使用CrontabSchedule

    使用CrontabSchedule一定要注意将时区设置为当前地区时区

    model参数与crontab表达式的对应关系:

    minite, hour, day_of_week, day_of_month, month_of_year
    

    全部默认为"*"

    def test_crontab():
        # 表示 * * * * * ,即每隔一分钟触发一次
        schedule = CrontabSchedule.objects.create(timezone='Asia/Shanghai')
        PeriodicTask.objects.create(
            name="%s" % str(datetime.now()),
            task="project_celery.celery_app.test_task",
            crontab=schedule,
            one_off=True
        )
    
  • 相关阅读:
    JS 循环遍历json
    客户端获取ip
    jquery 常用获取值得方法汇总
    C# MATLAB混合编程
    java设计模式之抽象工厂模式学习
    java设计模式之工厂模式学习
    java设计模式之装饰者模式学习
    本周任务
    模仿jquery的data
    js中random的应用
  • 原文地址:https://www.cnblogs.com/luozx207/p/12669375.html
Copyright © 2011-2022 走看看