zoukankan      html  css  js  c++  java
  • Celery模块使用

    一、celery概述

    1.1、celery定义

    celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。

    1.2、各模块版本

    模块 版本
    python 3.6.12
    django 2.2.17
    celery 5.0.5
    django-celery-beat 2.1.0
    django-celery-results 2.0.0

    1.3、参考

    https://www.cnblogs.com/wdliu/p/9530219.html

    https://www.cnblogs.com/f-g-f/p/11300656.html

    二、纯代码celery

    2.1、目录结构

    celery01
    ├── dy.py
    └── proj01
            ├── celeryconfig.py
            ├── celery.py
            ├── __init__.py
            └── tasks.py

    2.2、代码

    # celeryconfig.py
    # celery配置,队列和结果存放地址,序列化格式
    BROKER_URL = 'redis://8.136.184.235:6479'
    CELERY_RESULT_BACKEND = 'redis://8.136.184.235:6479/0'
    CELERY_TASK_SERIALIZER = 'msgpack'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
    CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
    

      

    # celery.py
    # 实例化celery,生成实例对象
    from __future__ import absolute_import
    from celery import Celery
    
    app = Celery('proj01', include=["proj01.tasks"])
    app.config_from_object("proj01.celeryconfig")
    
    if __name__ == "__main__":
        app.start()
    

      

    # tasks.py
    # celery的具体功能代码,此处为add函数
    from __future__ import absolute_import
    from time import sleep
    from proj01.celery import app
    
    
    @app.task
    def add(x, y):
        sleep(1)
        return x + y
    

      

    # __init__.py
    # 空文件,无内容
    

      

    # dy.py
    from proj01.tasks import add
    import time
    
    t1 = time.time()
    
    r1 = add.delay(1, 2)
    r2 = add.delay(2, 4)
    r3 = add.delay(3, 6)
    r4 = add.delay(4, 8)
    r5 = add.delay(5, 10)
    
    r_list = [r1, r2, r3, r4, r5]
    for r in r_list:
        while not r.ready():
            pass
        print('task:', r)
        print(r.result)
    
    t2 = time.time()
    
    print('共耗时:%s' % str(t2-t1))
    

      

    2.3、运行代码

    启动celery服务:

    在项目目录下,此项目目录是/root/celery01,执行如下命令:

    celery -A proj01.celery worker -l info

    调用服务:

    在项目目录下,此项目目录是/root/celery01,执行如下命令:
    python dy.py

    2.4、结果

    dy.py运行结果,如下图所示:

    celery服务运行结果,如下图所示:

    三、纯代码celery定时任务

    3.1、概述

    实现任务功能的定时执行,不用人工手动触发,但局限是不能动态自定义定时任务,此处在代码里写死。

    3.2、代码变动

    tip:在上一步(第二章 纯代码 Celery)的代码基础上改动,红色为变动内容

    # celery.py
    # 添加定时任务的功能,硬编码添加定时任务
    from __future__ import absolute_import
    from celery import Celery
    from datetime import timedelta
    from celery.schedules import crontab
    
    app = Celery('proj01', include=["proj01.tasks"])
    app.config_from_object("proj01.celeryconfig")
    
    ## 定时任务方式一
    app.conf.CELERYBEAT_SCHEDULE = {
        'add': {
            'task': 'proj01.tasks.add',
            'schedule': timedelta(seconds=5),
            'args': (16, 16)
        }
    }
    # crontab配置方法:https://www.cnblogs.com/alex3714/p/6351797.html
    ## 定时任务方式二,此处先注释,方便代码后面可以直接跑起来
    #app.conf.beat_schedule = {
    #    # Executes every Monday morning at 7:30 a.m.
    #    'add': {
    #        'task': 'proj01.tasks.add',
    #        'schedule': crontab(hour=7, minute=30, day_of_week=1),
    #        'args': (16, 16),
    #    },
    #}
    
    if __name__ == "__main__":
        app.start()
    

      

    3.3、运行代码:

    启动celery服务:

    在项目目录下,此项目目录是/root/celery01,执行如下命令:

    celery -A proj01.celery worker -l info --beat

    3.4、结果

    会按照定时设置,自动执行任务。

    四、django Celery

    4.1、目录结构

    红色为新增或改动的文件

    proj02
    ├── app02
    │   ├── admin.py
    │   ├── apps.py
    │   ├── __init__.py
    │   ├── models.py
    │   ├── tasks.py                   # 应用下,会被异步调用的函数
    │   ├── tests.py
    │   └── views.py
    ├── manage.py
    └── proj02
            ├── celery.py               # 实例化celery,生成celery实例对象
            ├── __init__.py
            ├── settings.py            # celery配置
            ├── urls.py
            └── wsgi.py

    4.2、代码

    # setting.py
    # 基础配置(apps、db)这里就不说了
    ... # 省略其他配置内容
    CELERY_TIMEZONE = "Asia/Shanghai"
    CELERY_TASK_TRACK_STARTED = True
    CELERY_TASK_TIME_LIMIT = 30 * 60
    
    BROKER_URL = 'redis://127.0.0.1:6479'
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6479/0'
    CELERY_TASK_SERIALIZER = 'msgpack'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
    CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
    

      

    # celery.py
    # 实例化celery对象,生成celery实例对象
    from __future__ import absolute_import, unicode_literals
    import os
    from django.conf import settings
    from celery import Celery
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj02.settings')
    app = Celery('proj02')
    # 此处不能用namespace,否则会出错
    # app.config_from_object('django.conf:settings', namespace='CELERY')
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(settings.INSTALLED_APPS)
    
    @app.task(bind=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')
    

      

    # tasks.py
    # 定义异步任务函数
    from celery import shared_task
    
    @shared_task
    def add(x, y):
        print(x+y)
        return x + y
    

      

    # app02/views.py
    # 定义视图函数,调用tasks.py里的异步函数
    from django.http import JsonResponse
    from app02 import tasks
    
    def index(request, *args, **kwargs):
        res=tasks.add.delay(1,3)
        return JsonResponse({'status':'successful','task_id':res.task_id})
    
    
    # proj02/urls.py
    from django.contrib import admin
    from django.urls import path
    from app02.views import index
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('', index),
    ]
    

      

    4.3、运行代码

    启动django服务:

    在项目目录下,此项目目录是/root/proj02,执行如下命令:

    python manage.py runserver 0.0.0.0:8888

    启动celery服务:

    在项目目录下,此项目目录是/root/proj02,执行如下命令:

    celery -A proj02.celery worker -l info

    调用接口:

    http://<django服务器地址ip>:8888

    4.4、结果

    {"status": "successful", "task_id": "fa1dc915-b1c2-4250-8557-2172d00febf1"}

    扩展:task结果存储

    CELERY_RESULT_BACKEND

    1、概述

     除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作。

    2、配置

    安装依赖:

    pip install django-celery-results

    配置settings.py,注册app:

    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    

      

    修改backend配置,将redis改为django-db:

    #CELERY_RESULT_BACKEND = 'redis://127.0.0.0:6479/0'
    CELERY_RESULT_BACKEND = 'django-db'
    

      

    同步数据库:

    python manage.py migrate django_celery_results
    

      

    3、使用

    查看数据库表(django_celery_results),每个任务执行后的结果,存储在这里。

    五、django Celery定时任务

    5.1、 概述

    实现任务功能的定时执行,不用人工手动触发,但局限是不能动态自定义定时任务,此处在代码里写死。

    5.2、代码变动

    tip:在上一步(第四章 django Celery)的代码基础上改动,红色为变动内容

    # proj02/celery_schedule.py
    # 定时任务执行的功能函数
    from datetime import timedelta
    from celery.schedules import crontab
    
    ## 定时任务方式一
    CELERYBEAT_SCHEDULE = {
        'add': {
            'task': 'app02.tasks.add',
            'schedule': timedelta(seconds=10),
            'args': (16, 16)
        }
    }
    
    ## 定时任务方式二,此处先注释,方便代码后面可以直接跑起来
    #CELERYBEAT_SCHEDULE = {
    #    'add': {
    #        'task': 'app02.tasks.add',
    #        'schedule': crontab(hour=7, minute=30, day_of_week=1),
    #        'args': (16, 16)
    #    }
    #}
    

      

    # proj02/settings.py
    # 新增对celery_schedule.py文件的引用
    from proj02.celery_schedule import CELERYBEAT_SCHEDULE
    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'app02',
        'django_celery_beat',
        'django_celery_results',   # 这个是本页`扩展`章节的内容
    ]
    

      

    5.3、运行代码

    启动django服务:

    在项目目录下,此项目目录是/root/proj02,执行如下命令:

    python manage.py runserver 0.0.0.0:8888

    启动celery服务:

    在项目目录下,此项目目录是/root/proj02,执行如下命令:

    celery -A proj02.celery worker -l info --beat

    5.4、结果

    会按照定时设置,自动执行任务。

    六、django管理后端自定义定时任务

    6.1、启动服务

    启动django服务:

    在项目目录下,此项目目录是/root/proj02,执行如下命令:

    python manage.py runserver 0.0.0.0:8888

    启动celery服务(特别注意启动时加上参数,红色标记):

    在项目目录下,此项目目录是/root/proj02,执行如下命令:

    celery -A proj02.celery worker -l info  --beat --scheduler django_celery_beat.schedulers:DatabaseScheduler

    6.2、界面配置使用

     登录管理后台,看到此界面,如下:

     详细配置,不具体介绍了,自己点点试试,很快就能知道。

    七、自定义动态定时任务

    7.1、概述

    django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。

    7.2、实例代码

    后续补充

    八、celery监控

     https://www.cnblogs.com/zivli/p/11517797.html

    后续补充

  • 相关阅读:
    Semaphore使用
    不可变对象
    Java锁--Lock实现原理(底层实现)
    Lambda Expressions and Functional Interfaces: Tips and Best Practices
    注解的作用
    linux命令大全
    linux &和&&,|和||
    SpringCloud 商品架构例子(一)
    springcloud starter(一)
    dubbo(一)
  • 原文地址:https://www.cnblogs.com/wangsl1204/p/14258843.html
Copyright © 2011-2022 走看看