zoukankan      html  css  js  c++  java
  • Django与celery集成:异步任务原理和过程

    0.原理和架构

    a.客户发送请求到django;

    b.django产生任务(要执行的函数);

    c.django把任务丢给celery的broker

    d.celery的worker从broker拿到任务并且执行;

    e.worker执行后保存结果到后端数据库;

    1.在django里面配置celery的目录结构

    PS D:djangotestmyrecrument> tree
    
    D:.
    ├─.idea
    │  └─inspectionProfiles
    ├─celery
    │  └─__pycache__
    │ - manage.py
    ├─interview
    │  ├─management
    │  │  └─commands
    │  │      └─__pycache__
    │  ├─migrations
    │  │  └─__pycache__
    │  └─__pycache__
    ├- db.sqlite3
    ├─myrecrument
    │  └─settings.py
    └─tmp

    2.关联django和celery

     2. cat D:djangotestmyrecrumentmyrecrumentcelery.py
    import os

    from celery import Celery

    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myrecrument.settings')

    app = Celery('myrecrument')

    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')

    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()


    @app.task(bind=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')

    3. cat D:djangotestmyrecrumentmyrecrumentsettings.py

    CELERY_BROKER_URL = 'redis://106.45.274.145:30013/0'
    CELERY_RESULT_BACKEND = 'redis://106.45.274.145:30013/1'
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERYD_MAX_TASKS_PER_CHILD = 10
    CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_work.log")
    CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_beat.log")

     4.配置异步发送钉钉消息D:djangotestmyrecrumentinterview> cat .dingtalk.py

    #coding=utf-8
    from dingtalkchatbot.chatbot import DingtalkChatbot
    
    from django.conf import settings
    
    def send(message, at_mobiles=[]):
        # 引用 settings里面配置的钉钉群消息通知的WebHook地址:
        webhook = settings.DINGTALK_WEB_HOOK
    
        # 初始化机器人小丁, # 方式一:通常初始化方式
        xiaoding = DingtalkChatbot(webhook)
    
        # 方式二:勾选“加签”选项时使用(v1.5以上新功能)
        # xiaoding = DingtalkChatbot(webhook, secret=secret)
    
        # Text消息@所有人
        xiaoding.send_text(msg=('面试通知: %s' %message), at_mobiles = ['14743373423'] )

    5.D:djangotestmyrecrumentinterview> cat . ask.py #配置任务task

    from __future__ import absolute_import, unicode_literals
    
    from celery import shared_task 
    from .dingtalk import send
    
    @shared_task
    def send_dingtalk_message(message):
        send(message)

    6.在interview的admin.py D:djangotestmyrecrumentinterview> cat .admin.py #应用tasks

    from .tasks import send_dingtalk_message
    ...
    def notify_interviewer(modeladmin,request,queryset):
        candidates = ' '
        for obj in queryset:
            candidates = obj.username + ';' + candidates
        send_dingtalk_message.delay(candidates)
       
       class CandidateAdmin(admin.ModelAdmin):
        actions = (export_model_as_csv,notify_interviewer)
     

     7.启动监控flower:D:djangotestmyrecrumentcelery>  celery -A tasks flower  broker='redis://redis:30013/0'

    8.启动celery的worker:D:djangotestmyrecrument> celery -A myrecrument   worker --loglevel=INFO -P eventlet

    9.启动项目:D:djangotestmyrecrument> python .manage.py runserver 0.0.0.0:8000

    10.flower的效果

     参考:https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html   First steps with Django

    用一个例子来演示会更加清晰
  • 相关阅读:
    最简单跨平台的日志库
    linux文件锁
    Linux 获取屏幕分辨率与窗口行列数(c/c++)
    linux 信号机制
    记一次函数异常(getopt_long)
    程序单实例运行
    简单地 Makefile 书写
    学习go的一些笔记
    20200930 10. Netty 核心源码剖析
    20200930 9. TCP 粘包和拆包 及解决方案
  • 原文地址:https://www.cnblogs.com/hixiaowei/p/14304638.html
Copyright © 2011-2022 走看看