zoukankan      html  css  js  c++  java
  • 一、Python Celery 4.4.7 与 Django 2.X的集成【实现异步调用】

    0、使用的技术

    Django+Celery+RabbitMQ

    1、安装Django和Celery

    pip3 install celery==4.4.7
    pip3 install django==2.2.2
    pip3 install eventlet==0.26.1 【Linux操作系统可以不安装】

    Clery 4.x开始无需安装django-celery

    Celery官方文档

    Django与Celery结合的文档 https://docs.celeryproject.org/en/stable/django/index.html

    Celery AP调用的文档 https://docs.celeryproject.org/en/v4.4.7/reference/index.html

    2、创建一个Django项目

    命令行创建Django项目,推荐【使用PyCharm创建Django项目】
    #
    创建一个Django项目,项目名:django_celery_project django-admin startproject django_celery_project
    # 进入项目
    cd django_celery_project
    # 创建一个app, app名字:app01 django-admin startapp app01

    3、配置app01与项目关联

    settings.py
    ...
    INSTALLED_APPS
    = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'app01.apps.App01Config' ]
    ...

    注意:

      如果是使用命令行创建,并且用PyCharm打开需要做如下配置

    这个是针对PyCharm生效

    还需要配置针对命令行启动Django服务生效的环境变量

    manage.py
    #!/usr/bin/env python
    """Django's command-line utility for administrative tasks."""
    import os
    import sys
    
    
    def main():
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_pro.settings')
    ...

    4、在项目django_celery_project,创建配置文件celery_settings.py

    from __future__ import absolute_import, unicode_literals
    # Celery settings
    from kombu import Queue, Exchange
    
    # 中间件的地址,这里使用RabbitMQ
    CELERY_BROKER_URL = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 处理结果存储,可使用redisMQMySQL,这里使用RabbitMQ
    CELERY_RESULT_BACKEND = 'rpc://development:root@192.168.2.129:5672//development_host'
    # CELERY_RESULT_BACKEND = 'django-db' # 这个是django-celery-results,会创建表,保存我们执行的结果,生产不推荐,必须自己获取结果存储,比较灵活些
    
    # 设置时区
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    # UTC时区换算关闭
    CELERY_ENABLE_UTC = False
    
    # 任务序列化
    CELERY_TASK_SERIALIZER = 'json'
    
    # 结果序列化
    CELERY_RESULT_SERIALIZER = 'json'
    
    # 接收的数据类型
    CELERY_ACCEPT_CONTENT = ['json']
    
    # 设置默认的队列default
    CELERY_TASK_DEFAULT_QUEU = "celery"
    
    # 定义队列
    CELERY_TASK_QUEUES = {
        Queue("celery", Exchange("celery"), routing_key="celery"),
        Queue("add_queue", Exchange("compute_node"), routing_key="add_task"),  # 定义队列:add_queue,绑定交换机:compute_node
        Queue("mul_queue", Exchange("compute_node"), routing_key="mul_task"),  # 定义队列:mul_queue,绑定交换机:compute_node
        Queue("xsum_queue", Exchange("compute_node"), routing_key="xsum_task")  # 定义队列:xsum_queue,绑定交换机:compute_node
    }
    celery_settings.py

    5、创建异步处理的任务函数,在app01的应用里面创建tasks.py

    from __future__ import absolute_import, unicode_literals
    
    from celery import shared_task
    
    @shared_task
    def add(x, y):
        """
            求和函数
        :param x: int
        :param y: int
        :return: Number int
        """
        return x + y
    
    @shared_task
    def mul(x, y):
        """
            相乘函数
        :param x: int
        :param y: int
        :return: Number int
        """
        return x * y
    
    @shared_task
    def xsum(numbers):
        """
            列表求和函数
        :param numbers: list
        :return: Number int
        """
        return sum(numbers)
    tasks.py

    6、在项目django_celery_project,创建celery程序的入口 celery.py,【注意:该文件名字必须是这个,利用项目启动时,会默认加载这个名字新建celery对象】

    from __future__ import absolute_import, unicode_literals
    
    import os
    
    from celery import Celery
    
    # 设置Django项目的配置文件
    from django_celery_project import celery_settings
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_project.settings')
    
    app = Celery('django_celery_project')
    
    # 配置celery配置文件的位置
    # 官网设置方法,直接集成在settings.py里面,并且前缀以CELERY_*,开头进行设置,不推荐,使settings.py变得更复杂
    # app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # 使用新建一个celery_settings.py,专门配置celery所需要的参数
    app.config_from_object(celery_settings, namespace='CELERY')
    
    # 自动发现项目所有app中包含文件名为tasks.py,加载所有任务到内存中
    app.autodiscover_tasks()
    
    @app.task(bind=True)  # bind=True,可以调用本身self类的对象
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    celery.py

    7、在项目django_celery_project==>__init__.py,编写如下内容

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django启动,shared_task将使用这个应用程序。
    from .celery import app as celery_app
    
    __all__ = ('celery_app',)

    8、根据celery调用任务有两种方式分别是:

    delay : delay是apply_async抽象封装的简易调用方式,默认会发往exchage:celery,queue:celery,只能传调用任务的参数
    apply_async : 调用任务的时候是可以指定哪个exchage、queue、Routing key和回调函数等功能,具体请查看官方的API

    8.1、Django路由配置,项目里面打开django_celery_project ==> urls.py 编辑如下:

    from django.urls import path
    
    from app01 import views
    
    urlpatterns = [
        path('async_add/', views.async_add_task),
        path('async_mul/', views.async_mul_task),
        path('async_xsum/', views.async_xsum_task),
        path('sync_add/', views.sync_add_task),
        path('sync_mul/', views.sync_mul_task), 
        path('sync_xsum/', views.sync_xsum_task),
    ]
    urls.py

    8.2、Django视图层的编辑,项目打开 app01 ==> views.py 编辑如下:

    from __future__ import absolute_import, unicode_literals
    from django.http import JsonResponse
    from celery.result import AsyncResult
    
    # Create your views here.
    from app01.tasks import add, mul, xsum
    
    
    def async_mul_task(request):
        """
          使用调用apply_async,相乘的任务
        :param request:
        :return:
        """
        arg1 = 1
        arg2 = 2
        result = mul.apply_async(args=(arg1, arg2,),
                                 queue='mul_queue',
                                 routing_key='mul_task',
                                 priority=0,
                                 exchange='compute_node')
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})
    
    
    def sync_mul_task(request):
        """
            使用delay,调用相乘的任务
        :param request:
        :return:
        """
        arg1 = 1
        arg2 = 2
        result = mul.delay(arg1, arg2)
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})
    
    
    def async_add_task(request):
        """
            使用调用apply_async,求和的任务
        :param request:
        :return:
        """
        arg1 = 2
        arg2 = 2
        result = add.apply_async(args=(arg1, arg2,),
                                 queue='add_queue',
                                 routing_key='add_task',
                                 priority=0,
                                 exchange='compute_node')
    
        task_status = AsyncResult(result.task_id, app=result.app)
    
        return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})
    
    
    def sync_add_task(request):
        """
            使用调用delay,求和的任务
        :param request:
        :return:
        """
        arg1 = 2
        arg2 = 2
        result = add.delay(arg1, arg2)
        print(result)
        task_status = AsyncResult(result.task_id, app=result.app)
    
        return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})
    
    
    def async_xsum_task(request):
        """
            使用apply_async,调用列表求和
        :param request:
        :return:
        """
        number_list = [1, 1, 1, 1, 6]
        result = xsum.apply_async(args=(number_list,),
                                  queue='xsum_queue',
                                  routing_key='xsum_task',
                                  priority=0,
                                  exchange='compute_node')
    
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': number_list, 'task_id': result.task_id, 'result': task_status.get()})
    
    
    def sync_xsum_task(request):
        """
            使用delay,调用列表求和
        :param request:
        :return:
        """
        number_list = [1, 1, 1, 1, 6]
        result = xsum.delay(number_list)
    
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': number_list, 'task_id': result.task_id, 'result': task_status.get()})
    views.py

    9、启动worker程序和Django程序

    # 启动 Celery worker程序
    celery -A django_celery_project worker -l info -P eventlet
    
    # 进入项目里面,启动 Django 程序
    python manage.py runserver 8000

    9.1、启动worker程序

    9.2、利用浏览器访问我们路由定义的url,远程调用任务运算

     

    9.3、观察MQ的绑定关系

    10、演示分布式运算效果,这里准备两个主机,把编写好的项目拷贝到那两个主机上,并且启动起来

    10.1、Windows 主机

    celery -A django_celery_project worker -l info -P eventlet

    10.2、Linux主机

    celery -A django_celery_project worker -l info

     10.3、启动Django测试,测试任务是否轮询分发运算

    10.4、出现上面情况,说明任务已经是分布式运算,可以实现运算主机水平扩展

    11、整个演示项目的下载地址

    https://github.com/ygbh/celery_django_project

    12、总结

    把该环境搭建起来,其它的定时任务和路由匹配调度这块。
    请参考官方文档:https://docs.celeryproject.org/en/stable/index.html
  • 相关阅读:
    JS 删除web sql 数据表
    JS 新建web sql 数据表
    JS 更新web sql 数据表的数据
    JS 删除一行web sql 数据表的数据
    JS 向web sql数据表插入数据
    JS 打开or连接web sql数据库
    JS 获取表单数据存入数组
    JS 限制小数点位数
    JS 通过id获取DOM对象--减少代码
    mvc “System.NullReferenceException”类型的异常在 App_Web_zo44wdaq.dll 中发生,但未在用户代码中进行处理 其他信息: 未将对象引用设置到对象的实例。
  • 原文地址:https://www.cnblogs.com/ygbh/p/13618556.html
Copyright © 2011-2022 走看看