zoukankan      html  css  js  c++  java
  • Python Django +Celery +flower

    准备工作

    1.创建django项目,添加应用到setting文件

    2.pip安装celery + eventlet + flower

    3.文件目录如下:

     4.文件配置如下

    celery_app目录下:

    # -*- coding: utf-8 -*-
    from celery import Celery
    app = Celery('demo')# 创建 Celery 实例
    app.config_from_object('celery_app.celeryconfig')# 通过 Celery 实例加载配置模块
    __init__.py文件
    BROKER_URL = 'redis://xxx.xxx.xxx.xxx:6379'# 指定 Broker
    CELERY_RESULT_BACKEND = 'redis://xxx.xxx.xxx.xxx:6379/0'# 指定 Backend
    CELERY_TIMEZONE='Asia/Shanghai'# 指定时区,默认是 UTC
    # CELERY_TIMEZONE='UTC'                            
    CELERY_IMPORTS = (
        # 指定导入的任务模块
        'celery_app.task1'
    )
    celeryconfig.py
    from celery_app import app
    @app.task()
    def add(a, b):
        return 'hello world: %i' % (a + b)
    task1.py

    Django_Celery目录下:

    from django.contrib import admin
    from django.urls import path
    from djocelery.views import test,getresult
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('test/',test.as_view()),
        path('getresult/',getresult.as_view())
    ]
    urls.py文件

    djocelery目录下:

    from django.shortcuts import HttpResponse
    from django.views import View
    from celery_app.task1 import add
    # Create your views here.
    
    res = {}
    class test(View):
        def get(self,request,*args,**kwargs):
            a,b = 1,2
            result = add.apply_async((a, b))
            res[str(result.id)] = result
            return HttpResponse(result.id)
        def post(self,request,*args,**kwargs):
            return HttpResponse("POST")
        def put(self,request,*args,**kwargs):
    
            return HttpResponse("PUT")
        def delete(self,request,*args,**kwargs):
            return HttpResponse("DELETE")
    
    
    class getresult(View):
        def get(self,request,*args,**kwargs):
            id = str(request.GET.get('id'))
            print(id)
            t1 = res[id].get()
            print(t1)
            return HttpResponse(t1)
    views.py文件

    快速开始

    1.python manage.py runserver 8080 (启动django)

    2.celery -A celery_app worker --loglevel=info -P eventlet (启动eventlet) 注意此条命令的执行位置,在celery_app上一级目录下执行

    其他常用参数

    celery -A proj worker --loglevel=INFO -P prefork  --concurrency=10  -n worker1
    celery -A proj worker --loglevel=INFO --P eventlet -c 100  -n worker2
    -P 指定运行模式 建议使用eventlet和prefork
    --concurrency或者-C  指定eventlet池或者进程池大小(默认进程数为cpu核心数)
    -n 指定worker名字

    3.celery flower --broker=redis://xxx.xxx.xxx.xxx:6379/0  (启动可视化监控)

    celery flower --broker=amqp://guest:guest@localhost:5672//  或
    celery flower --broker=redis://guest:guest@localhost:6379/0

    4.在浏览器访问http://127.0.0.1:8080/test/,提交任务。

    5.查看celery shell窗口,如下:

    6.访问http://127.0.0.1:5555/tasks

    多任务,多对列配置

    from kombu import Queue
    BROKER_URL = 'redis://123.207.251.121:6379'# 指定 Broker
    CELERY_RESULT_BACKEND = 'redis://123.207.251.121:6379/0'# 指定 Backend
    CELERY_TIMEZONE='Asia/Shanghai'# 指定时区,默认是 UTC
    # CELERY_TIMEZONE='UTC'                            
    CELERY_IMPORTS = (
        # 指定导入的任务模块
        'celery_app.task1'
    )
    
    # 配置队列
    CELERY_QUEUES = (
        Queue('default', routing_key='default'),
        Queue('q1', routing_key='testtask1'),
        Queue('q2',  routing_key='testtask2'),
    )
    
    # 路由(哪个任务放入哪个队列)
    CELERY_ROUTES = {
        'celery_app.task1.test_add': {'queue': 'q1', 'routing_key': 'testtask1'},
        'celery_app.task1.test_mul': {'queue': 'q2', 'routing_key': 'testtask2'},
    }
    celeryconfig.py
    from celery_app import app
    @app.task()
    def test_add(a, b):
        return 'hello world: %i' % (a + b)
    
    
    @app.task()
    def test_mul(a, b):
        return 'hello world: %i' % (a * b)
    task1.py
    """Django_Celery URL Configuration
    
    The `urlpatterns` list routes URLs to views. For more information please see:
        https://docs.djangoproject.com/en/3.0/topics/http/urls/
    Examples:
    Function views
        1. Add an import:  from my_app import views
        2. Add a URL to urlpatterns:  path('', views.home, name='home')
    Class-based views
        1. Add an import:  from other_app.views import Home
        2. Add a URL to urlpatterns:  path('', Home.as_view(), name='home')
    Including another URLconf
        1. Import the include() function: from django.urls import include, path
        2. Add a URL to urlpatterns:  path('blog/', include('blog.urls'))
    """
    from django.contrib import admin
    from django.urls import path
    from djocelery.views import test_Add,getresult,test_Mul
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('test_add/',test_Add.as_view()),
        path('test_mul/',test_Mul.as_view()),
        path('getresult/',getresult.as_view())
    ]
    views.py文件

    重新启动flower:显示设置队列

    Flower使用介绍

    原文链接:https://www.jianshu.com/p/4a408657ef76

    官方文档:https://flower-docs-cn.readthedocs.io/zh/latest/

    Dashboard

    Dashboard 页面是展示异步任务队列的主要情况,该页面包括如下几种状态的任务:

    • Active: 表示worker从队列中获取到任务,且正在执行的任务数
    • Processed: 表示worker从队列中获取到任务的总数据量
    • Failed: 表示worker从队列中获取到任务,且执行失败了的(异常退出)
    • Successed: 表示worker从队列中获取到任务,且执行成功了的
    • Retried: 表示worker从队列中获取到任务,因为一些其他原因重新执行的数量

    所以,上述这些数量的关系如下:
    Processed = Active + Received + Failed + Successed + Retried
    其中 Received 表示该任务分配到了worker中,但是还没有被处理的任务数量

     
    Dashboard页面

    Worker Name 表示的是执行celery任务的worker名称;
    Status 表示的是该worker的状态,包括 Online (在线) 、 Offline(离线),重启flower进程,即可将Offline状态的worker剔除掉;
    Active / Processed / Failed / Successed / Retried 分别表示该worker正在执行的任务数、该worker处理的总任务数、处理失败的任务数、处理成功的任务数、重试的任务数;
    Load Average 表示系统在 1min / 5min / 15min 内的CPU平均负载(百分比)

    Tasks

    Tasks 页面是展示所有worker接收到的任务的处理情况。下面对该表格中的做一些介绍

     
    Tasks页面
    • Name: 表示该任务的名称,默认规则为该函数的路径规则,例如 {模块名}.{文件名}.{函数名}
    • UUID: 表示一个唯一字符串ID用于表示该任务
    • State: 表示该任务的状态,包括: SUCCESS / FAILURE / STARTED / RECEIVED
      • SUCCESS 表示该任务执行完毕且成功
      • FAILURE 表示该任务执行失败
      • STARTED 表示该任务正在执行
      • RECEIVED 表示该任务在worker中,只是被接收而已
    • args: 表示该任务的列表参数
    • kwargs: 表示该任务的字典参数
    • Result: 表示该任务函数的返回结果
    • Received: 表示该任务在worker接收到的时间
    • Started: 表示该任务在worker开始执行的时间
    • Runtime: 表示该任务在worker真正执行的耗时(单位:秒)
    • Worker: 表示该任务所在的worker名称

    Broker

    Broker 页面展示的是celery连接消息队列的信息,包括消息队列的访问URL,下面的截图展示的是链接的RabbitMQ,当然也可以链接Redis等。

     
    Broker页面
    • Name: 表示队列的名称
    • Messages: 表示该队列的消息数量
    • Unacked: 表示该队列中还没收到ack确认的消息数量(该消息可能处于 RECEIVED 或是 STARTED
    • Ready: 表示该队列中还未分配到worker的消息数量
    • Consumers: 表示消费者数量(即worker数量)
    • Idle since: 表示该队列空闲的最初时间,否则为 N/A

    Monitor

    Monitor 页面展示的是celery后台任务的曲线展示状况。

     


     
     
     
     
  • 相关阅读:
    01分数规划基本类型
    hdu3976
    hdu1430 康托展开+bfs预处理
    模板
    模板
    redis初始
    mybatis12
    mybatis13
    mybatis10
    springmvc10
  • 原文地址:https://www.cnblogs.com/-wenli/p/13409670.html
Copyright © 2011-2022 走看看