zoukankan      html  css  js  c++  java
  • Django + Channels + Celery 实时更新日志

    Django + Channels + Celery 实时更新日志

    1.准备工作:

    • 系统为windows系统。技术实现:

      python 3.6.8
      django 2.2
      celery 3.1.26
      redis  2.10.6
      django-celery 3.3.1
      channels 2.4.0
      channels-redis 2.4.2
      
    • settings.py配置。

      • 注册APP
      INSTALLED_APPS = [
          ...
          # 注册django-celery
          "djcelery",
          # 注册 channel
          "channels",
      ]
      
      • celery一些参数配置

        • 一张图简单看一下celery

      import djcelery
      # 加载djcelery
      djcelery.setup_loader() 
      # 数据库调度
      BROKER_TRANSPORT='redis' #指定redis
      # CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler' # celey处理器,固定
      CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Broker配置,使用Redis作为消息中间件   消息队列
      CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # BACKEND配置,这里使用redis    存储结果
      # 指定任务路径。为api应用下的tasks.py文件
      CELERY_IMPORTS = ('api.tasks')
      # CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
      #允许的内容类型,
      CELERY_ACCEPT_CONTENT=['pickle','json']
      #任务的序列化方式
      CELERY_TASK_SERIALIZER = 'json'
      #celery时区,定时任务使用
      CELERY_TIMEZONE = 'Asia/Shanghai'
      
      #  每个worker最多执行100个任务被销毁,可以防止内存泄漏
      CELERYD_MAX_TASKS_PER_CHILD = 100
      #  有些情况下可以防止死锁
      CELERYD_FORCE_EXECV = True
      #  设置并发的worker数量
      CELERYD_CONCURRENCY = 4
      # 允许重试
      CELERY_ACKS_LATE = True
      #  单个任务的最大运行时间,超过就杀死
      CELERYD_TASK_TIME_LEMIT = 12 * 30
      
      • channels一些配置,注册channels需要指定ASGI路由地址
      # 指定ASGI的路由地址
      # 指定api 应用下routing.py
      ASGI_APPLICATION = 'api.routing.application'#
      CHANNEL_LAYERS = {
          'default': {
              'BACKEND': 'channels_redis.core.RedisChannelLayer',
              'CONFIG': {
                  "hosts": [('127.0.0.1', 6379)],
              },
          },
      }
      
      • 假数据
      # 这里字典形式封装路经代表日志文件路径
      TAILF = {
          1: r'J:djangoSerializersapilog_recoredsheet1',
          2: r'J:djangoSerializersapilog_recoredsheet2',
      }
      

    1.5文件目录如下:

    djangoSerializers
    	|__ api
    	    |___ __init__.py 
    		|___ taksk.py # 为创建
    		|___ views.py
    		|___ ...
    	|__ djangoSerializers
    		|___ __init__.py
    		|___ settings.py
             |___ urls.py
             |___ wsgi.py
             |___ celery.py   #为创建
    

    2.celery

    • 在项目目录下二级目录下创建celery.py

      from django.conf import setting
      from __future__ import absolute_import, unicode_literals
      from celery import Celery, platforms
      from django.conf import settings
      import os
      # 设置当前django环境
      os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoSerializers.settings")
      # 实例化Celery对象
      app = Celery("djangoSerializers")
      # 加载配置文件,并使用CELERY前缀
      app.config_from_object("django.conf:settings", namespace='CELERY')
      # celery不能root用户启动解决
      platforms.C_FORCE_ROOT = True
      # 去寻找每个app下的tasks.py文件
      app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
      
    • 在该目录__init___.py 添加

      # 这是为了确保在django启动时启动 celery
      from __future__ import absolute_import
      from .celery import app as celery_app
      
    • 在app应用下创建tasks.py,用于celery异步任务处理。

      from celery import shared_task
      from asgiref.sync import async_to_sync
      from channels.layers import get_channel_layer
      @shared_task
      def tailf(id,channel_name):
      	# 暂时先空出来
      

    3.channel

    • 因我们之前在settings.py指定ASGI_APPLICATION路径,于是在api下创建routing.py

      
      from channels.auth import AuthMiddlewareStack
      from channels.routing import ProtocolTypeRouter, URLRouter
      from django.urls import re_path
      
      from tailf.consumers import TailfConsumer
      class TokenAuthMiddle:
          def __init__(self,inner):
              self.inner = inner
          def __call__(self,scope):
              return self.inner(scope)
      # 这里为了简单验证直接返回当前对象,也可以自定义或者使用内置 AuthMiddlewareStack
      TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddle(AuthMiddlewareStack(inner))
      # 指向处理websocket的类视图函数 TailfConsumer
      application = ProtocolTypeRouter({
          "websocket": TokenAuthMiddlewareStack(URLRouter([
                  re_path(r'^ws/tailf/(?P<id>d+)/$', TailfConsumer),
              ])
          )
      })
      
      
    • 在当前项目目录下新建consumers.py用于websocket的连接和断开。

      import json
      
      from channels.generic.websocket import WebsocketConsumer
      # 这里tailf是要执行异步任务
      from api.tasks import tailf
      
      class TailfConsumer(WebsocketConsumer):
          def connect(self):
              # 通过获取id来执行异步任务
              self.file_id = self.scope["url_route"]["kwargs"]["id"]
              self.result = tailf.delay(self.file_id, self.channel_name)
              self.accept()
          def disconnect(self, code):
              # 终止执行中task
              self.result.revoke(terminate=True)
              print("disconnect:",self.file_id,self.channel_name)
          def send_message(self,event):
              # 发送给客户端消息
              self.send(text_data=json.dumps({
                  "message":event["message"]
              }))
      
      

    4.其他逻辑配置

    • url.py

      urlpatterns = [
      	...
      	url(r'tailf', views.tailf_view, name='tailf-url'),
      	]
      
    • 视图函数

      def tailf_view(request):
          logDict = settings.TAILF
          return render(request,"index.html",{"logDict": logDict})
      
    • templates

      <!DOCTYPE html>
      <html lang="en">
      <head>
          <meta charset="UTF-8">
          <title>Title</title>
      </head>
      <body>
      <div class="col-sm-8">
          <select class="form-control" id="file">
              <option value="">选择要监听的日志</option>
              {% for k,v in logDict.items %}
                  <option value="{{ k }}">{{ v }}</option>
              {% endfor %}
          </select>
      </div>
      <div class="col-sm-2">
          <input class="btn btn-success btn-block" type="button" onclick="connect()" value="开始监听"/><br/>
      </div>
      <div class="col-sm-2">
          <input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="终止监听"/><br/>
      </div>
      <div class="col-sm-12">
          <textarea class="form-control" id="chat-log" disabled rows="20"></textarea>
      </div>
      <script src="https://cdn.bootcss.com/jquery/3.5.0/jquery.min.js"></script>
      </body>
      <script>
          function connect() {
              if ( $('#file').val() ) {
                  var url = 'ws://' + window.location.host + '/ws/tailf/' + $('#file').val() + '/';
                  window.chatSocket = new WebSocket(url);
                  // 当浏览器接收到websocket服务器发送过来的数据时,就会触发onmessage消息,参数e包含了服务端发送过来的数据
                  chatSocket.onmessage = function(e) {
                      var data = JSON.parse(e.data);
                      var message = data['message'];
                      document.querySelector('#chat-log').value += (message);
                      // 跳转到页面底部
                      $('#chat-log').scrollTop($('#chat-log')[0].scrollHeight);
                };
      		 // 如果连接失败,或者发送、接收数据失败,或者数据处理出错都会触发onerror消息
                chatSocket.onerror = function(e) {
                  console.error('服务端连接异常!')
                };
      		// 当浏览器接收到websocket服务器发送过来的关闭连接请求时,会触发onclose消息
                chatSocket.onclose = function(e) {
                  console.error('websocket已关闭!')
                };
              } else {
                console.log('请选择要监听的日志文件')
              }
            };
          function goclose() {
              // 用于关闭连接
              window.chatSocket.close();
              window.chatSocket.onclose = function(e) {
              console.log('已终止日志监听!')
          };
        }
      </script>
      </html>
      
      
      • 当然还有onopen:当浏览器和websocket服务端连接成功后会触发onopen消息。

    5.启动worker节点:

    • 终端执行:
    python3 manage.py celery worker -l INFO
    
    • 启动项目测试一下吧。

    6.Django项目敏感信息保存

    • django项目做完后,向生产环境部署时,为了避免一些敏感信息被其他人利用,我们需要进行一定保护,比如settings配置中的一些密码等内容。
    • 通过os.environ模块实现,这里以SECRET_KEY为例:
    在linux系统中 /etc/profile 中写入SECRET_KEY
    e.g.:
    	export SECRET_KEY = "..."
    settings.py
    	import os
    	SECRET_KEY = os.environ["SECRET_KEY"]
    
    • 注意:更改完/etc/profile后执行source /etc/profile,以使更新后的内容生效。

    参考链接:

  • 相关阅读:
    解析三种常见分布式锁的实现
    RabbitMQ基础概念详解
    数据库事务概念
    ECIF与CRM
    MQ(消息队列)学习
    数据粒度的设计
    链表之 头节点与尾指针 区别
    牛客之错题(2016.1.15) && 带头节点与不带头的区别
    数据结构之递归回溯算法
    LeetCode--Single Number
  • 原文地址:https://www.cnblogs.com/xujunkai/p/12738319.html
Copyright © 2011-2022 走看看