zoukankan      html  css  js  c++  java
  • Django使用Celery

    一、配置celery

      创建django项目celery_demo, 创建应用demo:

    django-admin startproject celery_demo
    python manage.py startapp demo

      在celery_demo模块中创建celery.py模块, 文件目录为:

      

       celery.py模块内容为:

    from celery import Celery
    from django.conf import settings
    import os
    
    # 为celery设置环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings')
    
    # 创建应用
    app = Celery("demo")
    # 配置应用
    app.conf.update(
        # 配置broker, 这里我们用redis作为broker
        BROKER_URL='redis://:332572@127.0.0.1:6379/1',
    )
    # 设置app自动加载任务
    # 从已经安装的app中查找任务
    app.autodiscover_tasks(settings.INSTALLED_APPS)
    

      在应用demo引用创建tasks.py模块, 文件目录为:

      

      我们在文件内创建一个任务函数my_task:

    from celery_demo.celery import app
    import time
    
    # 加上app对象的task装饰器
    # 此函数为任务函数
    @app.task
    def my_task():
        print("任务开始执行....")
        time.sleep(5)
        print("任务执行结束....")
    

      在views.py模块中创建视图index:

    from django.shortcuts import render
    from django.http import HttpResponse
    from .tasks import my_task
    
    
    def index(request):
    # 将my_task任务加入到celery队列中
    # 如果my_task函数有参数,可通过delay()传递
    # 例如 my_task(a, b), my_task.delay(10, 20)
        my_task.delay()
    
        return HttpResponse("<h1>服务器返回响应内容!</h1>")
    

      在celey_demo/settings.py配置视图路由:

    from django.conf.urls import url
    from django.contrib import admin
    from demo.views import index
    
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        url(r'^$', index),
    ]
    

      创建worker等待处理celery队列中任务, 在终端执行命令:

    celery -A celery_demo worker -l info

      启动django测试服务器:

    python manage.py runserver

    二、存储任务结果

      此处需要用到额外包django_celery_results, 先安装包:

    pip install django-celery-results
    

      在celery_demo/settings.py中安装此应用:

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'demo',
        'django_celery_results',  # 注意此处应用名为下划线
    ]
    

      回到celery_demo/celery.py模块中,增加配置信息如下:

    from celery import Celery
    from django.conf import settings
    import os
    
    # 为celery设置环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings')
    
    # 创建应用
    app = Celery("demo")
    # 配置应用
    app.conf.update(
        # 配置broker, 这里我们用redis作为broker
        BROKER_URL='redis://:332572@127.0.0.1:6379/1',
        # 使用项目数据库存储任务执行结果
        CELERY_RESULT_BACKEND='django-db',
    )
    # 设置app自动加载任务
    # 从已经安装的app中查找任务
    app.autodiscover_tasks(settings.INSTALLED_APPS)
    

      创建django_celery_results应用所需数据库表, 执行迁移文件:

    python manage.py migrate django_celery_results
    

      我这里使用的是django默认的数据库sqlit, 执行迁移之后,会在数据库中创建一张用来存储任务结果的表:

      再次从浏览器发送请求, 任务执行结束之后,将任务结果保存在数据库中:

      

      

    三、定时任务

      如果我们想某日某时执行某个任务,或者每隔一段时间执行某个任务,也可以使用celery来完成.   使用定时任务,需要安装额外包:

    pip install django_celery_beat
    

      首先在settings.py中安装此应用:

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'demo',
        'django_celery_results',
        'django_celery_beat',  # 安装应用
    ]
    

      在celery_demo/celery.py模块中增加定时任务配置:

    from celery import Celery
    from django.conf import settings
    import os
    
    # 为celery设置环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings')
    
    # 创建应用
    app = Celery("demo")
    # 配置应用
    app.conf.update(
        # 配置broker, 这里我们用redis作为broker
        BROKER_URL='redis://:332572@127.0.0.1:6379/1',
        # 使用项目数据库存储任务执行结果
        CELERY_RESULT_BACKEND='django-db',
        # 配置定时器模块,定时器信息存储在数据库中
        CELERYBEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler',
    
    )
    # 设置app自动加载任务
    # 从已经安装的app中查找任务
    app.autodiscover_tasks(settings.INSTALLED_APPS)
    

      由于定时器信息存储在数据库中,我们需要先生成对应表, 对diango_celery_beat执行迁移操作,创建对应表:

    python manage.py migrate django_celery_beat

      我们可登录网站后台Admin去创建对应任务, 首先我们先在tasks.py模块中增加新的任务,用于定时去执行(5秒执行一次)

    from celery_demo.celery import app
    import time
    
    # 用于定时执行的任务
    @app.task
    def interval_task():
        print("我每隔5秒钟时间执行一次....")
    

      首先创建后台管理员帐号:

    python manage.py createsuperuser
    

      登录管理后台Admin:

      

      其中Crontabs用于定时某个具体时间执行某个任务的时间,Intervals用于每隔多久执行任务的事件,具体任务的执行在Periodic tasks表中创建。

      我们要创建每隔5秒执行某个任务,所以在Intervals表名后面点击Add按钮:

      然后在Periodic tasks表名后面,点击Add按钮,添加任务:

      

      启动定时任务:

    celery -A celery_demo worker -l info --beat
     

      任务每隔5秒中就会执行一次,如果配置了存储,那么每次任务执行的结果也会被保存到对应的数据库中。

     
  • 相关阅读:
    python 并发编程 多线程 event
    python 并发编程 多线程 定时器
    python 并发编程 多线程 信号量
    linux top 查看CPU命令
    python 并发编程 多线程 GIL与多线程
    python 并发编程 多线程 死锁现象与递归锁
    python 并发编程 多线程 GIL与Lock
    python GIL全局解释器锁与互斥锁 目录
    python 并发编程 多线程 GIL全局解释器锁基本概念
    执行python程序 出现三部曲
  • 原文地址:https://www.cnblogs.com/mxsf/p/10375802.html
Copyright © 2011-2022 走看看