zoukankan      html  css  js  c++  java
  • python测试开发django-161.Celery 定时任务保存到数据库 (djcelery)

    前言

    接着前面Celery 定时任务,这篇使用Celery + djcelery 把定时任务存到数据库。

    djcelery 环境准备

    定时任务基础环境准备,就不多说了,接着前面一篇https://www.cnblogs.com/yoyoketang/p/15432907.html.
    Celery的使用方式有两种:

    • Celery 只用Celery,本身自带worker 和 beat (定时任务)功能,定时任务在setting配置
    • Celery + djcelery 使用了djcelery,可以在任务中方便的直接操作 Django 数据库,而且最终的任务可以在 Django 的后台中查看和修改相关的任务。

    多安装一个 djcelery 主要是把定时任务放到数据库中,方便配置和管理。

    pip 安装django-celery

    pip install django-celery==3.3.1
    

    在 setting 里面配置 INSTALLED_APPS,添加'djcelery'

    INSTALLED_APPS = [
        ......
        'djcelery'
    ]
    

    同步数据库

    python manage.py makemigrations
    python manage.py migrate
    

    执行完成后会看到djcelery相关的几张表

    CELERY 配置

    在setting.py 添加CELERY 相关配置

    import djcelery
    djcelery.setup_loader()
    
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = True
    
    BROKER_URL = 'redis://192.168.1.1:6379'
    # RESULT_BACKEND 结果保存数据库
    CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
    # SCHEDULER 定时任务保存数据库
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
    

    相对于前面一篇,修改了以下内容

    # 新增这2句
    import djcelery
    djcelery.setup_loader()
    
    # RESULT_BACKEND 结果保存数据库
    CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
    # SCHEDULER 定时任务保存数据库
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
    

    tasks任务

    在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

    from __future__ import absolute_import
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        print("task----------111111----------")
        return x + y
    
    
    @shared_task
    def mul(x, y):
        print("task----------22222----------")
        return x * y
    

    views视图创建任务

    创建视图,把定时任务信息写入数据库

    from django.http import JsonResponse
    import datetime
    import json
    from djcelery.models import PeriodicTask, CrontabSchedule
    
    
    def create_task(request):
        task_name = "test"	 # 唯一值,自定义不能重复, 这里是测试下,先写死
        task = 'yoyo.tasks.add'  # 任务的注册路径
        # task_args = [10, 11]   # 任务参数
        task_kwargs = {'x': 10, 'y': 11}  # 关键字参数
        # 定时任务规则
        crontab_time = {
            'minute': '*/2',       # 每2分钟执行一次
            'hour': '*',
            'day_of_week': '*',
            'day_of_month': '*',
            'month_of_year': '*'
        }
        # 写入 schedule表
        schedule = CrontabSchedule.objects.create(**crontab_time)
        # 任务和 schedule 关联
        task, created = PeriodicTask.objects.get_or_create(
            name=task_name,		# 名称保持唯一
            task=task,
            crontab=schedule,
            enabled=True,	  # 是否开启任务
            # args=json.dumps(task_args),
            kwargs=json.dumps(task_kwargs),
            # 任务过期时间,设置当前时间往后1天
            expires=datetime.datetime.now()+datetime.timedelta(days=1)
        )
        if created:
            return JsonResponse({"code": 0, "msg": "success"})
        else:
            return JsonResponse({"code": 111, "msg": "create failed"})
    

    分别启动django, worker 和 beat服务

    python manage.py runserver 0.0.0.0:8000
    celery -A MyDjango beat -l info
    celery -A MyDjango worker -l info
    

    访问接口触发后,数据库 djcelery_crontabschedule 表会写入定时信息

    djcelery_periodictask 表记录任务信息

  • 相关阅读:
    GridView中checkbox实现全选[转]
    go 格式化秒 running
    mysql 聚簇索引和非聚簇索引 running
    go context上下文取消 running
    go reflect running
    time.ticker running
    go 数据结构与算法之二分查找 running
    mysql 联合索引最左前缀匹配原则 running
    es 修改 mapping 字段类型 running
    linux 查看虚拟机网卡命令 running
  • 原文地址:https://www.cnblogs.com/yoyoketang/p/15433710.html
Copyright © 2011-2022 走看看