zoukankan      html  css  js  c++  java
  • Django + celery + redis 执行异步任务及查看结果

     

    官方文档

    其他文档

    开发环境

    • python                         3.6.8
    • django                         1.11
    • celery                           4.3.0      
    • django-celery-results  1.1.2  
    • django-celery-beat     1.5.0      

    安装 redis

    安装操作 redis 库

    pip install redis

    (这里说明一下,pip 安装的 redis 仅仅是一个连接到 redis 缓存的一个工具;redis 服务需要自己去安装,安装文档如上)

    安装 celery

    pip install celery

    安装 Django-celery-results

    pip install django-celery-results

    配置 settings.py

    # 添加 djcelery APP
    INSTALLED_APPS = [
        # ...
        'django_celery_results',  # 查看 celery 执行结果
    ]
    
    # django 缓存
    CACHES = {
        "default": {
            "BACKEND": "django_redis.cache.RedisCache",
            "LOCATION": "redis://127.0.0.1:6379/1",
            "OPTIONS": {
                "CLIENT_CLASS": "django_redis.client.DefaultClient",
            }
        }
    }
    
    
    # celery 定时任务
    # 注意,celery4 版本后,CELERY_BROKER_URL 改为 BROKER_URL
    BROKER_URL = 'redis://127.0.0.1:6379/0'  # Broker 使用 Redis, 使用0数据库(暂时不是很清楚原理)
    # CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  # 定时任务调度器 python manage.py celery beat
    CELERYD_MAX_TASKS_PER_CHILD = 3  # 每个 worker 最多执行3个任务就会被销毁,可防止内存泄露
    # CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  # celery 结果返回,可用于跟踪结果
    CELERY_RESULT_BACKEND = 'django-db'  # 使用 database 作为结果存储
    CELERY_CACHE_BACKEND = 'django-cache'  # celery 后端缓存
    
    # celery 内容等消息的格式设置
    if os.name != "nt":
        # Mac and Centos
        # worker 启动命令:celery -A sqlmanager worker -l info
        CELERY_ACCEPT_CONTENT = ['application/json', ]
        CELERY_TASK_SERIALIZER = 'json'
        # CELERY_RESULT_SERIALIZER = 'json'
    else:
        # windows
        # pip install eventlet
        # worker 启动命令:celery -A sqlmanager worker -l info -P eventlet
        CELERY_ACCEPT_CONTENT = ['pickle', ]
        CELERY_TASK_SERIALIZER = 'pickle'
        # CELERY_RESULT_SERIALIZER = 'pickle'
    

      

    生成 Django-celery-results 关联表

    python manage.py migrate
    

     

    python manage.py migrate
    # 结果
    raven.contrib.django.client.DjangoClient: 2019-12-15 21:47:10,426 /XXXXX/lib/python3.6/site-packages/raven/base.py [line:213] INFO Raven is not configured (logging is disabled). Please see the documentation for more information.
    Operations to perform:
      Apply all migrations: admin, auth, blog, captcha, contenttypes, django_celery_results, djcelery, logger, photo, sessions, sites, user, users
    Running migrations:
      Applying django_celery_results.0001_initial... OK
      Applying django_celery_results.0002_add_task_name_args_kwargs... OK
      Applying django_celery_results.0003_auto_20181106_1101... OK
      Applying django_celery_results.0004_auto_20190516_0412... OK
      Applying djcelery.0001_initial... OK
    

      

    项目根目录添加 celery.py

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    from django.conf import settings
    import os
    
    # 获取当前文件夹名,即为该 Django 的项目名
    project_name = os.path.split(os.path.abspath('.'))[-1]
    project_settings = '%s.settings' % project_name
    
    # 设置环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_settings)
    
    # 实例化 Celery
    app = Celery(project_name)
    
    # 使用 django 的 settings 文件配置 celery
    app.config_from_object('django.conf:settings')
    
    # Celery 加载所有注册的应用
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    

    配置项目根目录 __init__.py

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    import pymysql
    
    pymysql.install_as_MySQLdb()
    
    __all__ = ('celery_app',)
    

    app 目录添加 tasks.py

    import json
    import requests
    from celery import task
    
    @task
    def task_send_dd_text(url, msg, atMoblies, atAll="flase"):
        body = {
            "msgtype": "text",
            "text": {
                "content": msg
            },
            "at": {
                "atMobiles": atMoblies,
                "isAtAll": atAll
            }
    
    
        }
        headers = {'content-type': 'application/json',
                   'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'}
        r = requests.post(url, headers=headers, data=json.dumps(body))
        # print(r.text)
    

      

    views.py 调用

    # 假如 url 设置成 test
    
    def test(request):
        # 导入
        from .tasks import task_send_dd_text
        # 执行
        task_send_dd_text.delay(settings.DD_NOTICE_URL, "异步任务调用成功", atMoblies=["18612345678"], atAll="false")
        return HttpResponse("test")
    

      

    启动 celery worker

    # 项目根目录终端执行(项目名称)
    
    centos or mac os:celery -A sqlmanager(项目名称) worker -l info (centos)
    windows: celery -A sqlmanager(项目名称) worker -l info -P eventlet (可能还需要 pip install eventlet)
    
    # 守护进程
    
    /root/.virtualenvs/blog/bin/celery multi start w1 -A sqlmanager(项目名称) -l info --logfile=./celerylog.log
    

      

    centos7 守护 celery worker

    访问调用 异步任务 的视图

    http://127.0.0.1/test

    Django 后台查看 celery 异步任务结果

  • 相关阅读:
    Python 模块 itertools
    Python 字符串的encode与decode
    python 模块 hashlib(提供多个不同的加密算法)
    暴力尝试安卓gesture.key
    hdu 1300 Pearls(DP)
    hdu 1232 畅通工程(并查集)
    hdu 1856 More is better(并查集)
    hdu 1198 Farm Irrigation(并查集)
    hdu 3635 Dragon Balls(并查集)
    hdu 3038 How Many Answers Are Wrong(并查集)
  • 原文地址:https://www.cnblogs.com/lanheader/p/13615772.html
Copyright © 2011-2022 走看看