zoukankan      html  css  js  c++  java
  • 代码发布5 发布流程, 基于channel-layers实现群发功能, 节点图标的创建, 节点状态动态改变, consumers.py/deploy.html代码示例

    发布流程

    给任务单的展示页面,添加一个去发布的按钮,点击进入发布界面(项目基本介绍,gojs渲染图标)

    关于静态文件夹static既可以在全局创建,也可以在每一个应用创建

    """静态文件配置"""
    # 1.配置文件中直接指定查找路径(从上往下查找)      方式一
    STATICFILES_DIRS = [
      os.path.join(BASE_DIR,'static1'),
      os.path.join(BASE_DIR,'static2'),
      os.path.join(BASE_DIR,'static3'),
      os.path.join(BASE_DIR,'static4'),
    ]
    # 2.直接利用模版语法     方式二
    {% load staticfiles %}
    <script src="{% static 'js/go.js' %}"></script>

    # 配置文件中配置
    STATIC_URL = '/static/'

    初始化图标数据应该来自于后端(gojs数据绑定)

    当多个用户打开相同的发布页面的时候,只要有一个人在操作,其他人的界面都应该看到效果(channle-layers)

     

    channle-layers基本使用

    channnel-layers常用的方法

    # 1 如何获取url中无名有名分组的参数
    self.scope['url_route']['kwargs'].get('kwargs')
    self.scope['url_route']['args'].get('args')
    
    # 2 请求链接成功之后加入对应的群聊中
    from asgiref.sync import async_to_sync
    async_to_sync(self.channel_layer.group_add)('群号',self.channel_name)
    
    # 3 给客户端发送消息(群发)
    async_to_sync(self.channel_layer.group_send)('群号',{'type':'xx.oo','message':'{"code":True,"data":"jason"}'})
    
    # type后面定义的是方法 你需要在当前类下面定义一个方法
    def xx_oo(self,event):
      """类似于循环当前群聊里面的所有成员 依次发送数据"""
      data = event.get('message')
      self.send(json.dumps(data))
    
    # 4 断开链接应该将对应群聊里面的用户剔除
    async_to_sync(self.channel_layer.group_discard)('群号',self.channel_name)
    • 配置文件中配置参数

    CHANNEL_LAYERS = {
        'default':{
            'BACKEND':'channels.layers.InMemoryChannelLayer'
        }
    }
    • 如何获取无名有名分组中url携带的参数 (因为此处用类,无法像方法获得有名分组参数)
    task_id = self.scope['url_route']['kwargs'].get('task_id')

    代码:

    # routing.py
    application = ProtocolTypeRouter({
        'websocket':URLRouter([
            re_path(r'^publish/(?P<task_id>d+)',consumers.PublishConsumer)
        ])
    })
    
    # consumers.py
    class PublishConsumer(WebsocketConsumer):
        def websocket_connect(self, message):
            self.accept()
            # 获取url中无名有名分组的参数   self.scope是一个大字典  里面包含了前端所有的数据
            task_id = self.scope['url_route']['kwargs'].get('task_id')  # scope['url_route']为固定参数,获取有名分组参数
            # task_id = self.scope['url_route']['args'].get('task_id')    # 无名分组获取参数
    • 链接对象自动加入对应的群聊
    from asgiref.sync import async_to_sync
    async_to_sync(self.channel_layer.group_add)(task_id,self.channel_name)
    • 给特定的群中发消息
    async_to_sync(self.channel_layer.group_send)(task_id,{'type':'my.send','message':{'code':'init','data':node_list}})
    """
                后面字典的键是固定的 就叫type和message
                type后面指定的值就是负责发送消息的方法(将message后面的数据交由type后面指定的方法发送给对应的群聊中)
                针对type后面的方法名 有一个固定的变化格式
                my.send     >>>    my_send
                xxx.ooo     >>>    xxx_ooo
    """
    • 在类中需要定义一个专门发送消息的方法
    def my_send(self,event):   # event为侯敏的消息对象
            message = event.get('message')  # {'code':'init','data':node_list}
            # 发送数据
            self.send(json.dumps(message))
            """
            内部原理就类似于是循环当前群组里面所有的链接对象 然后依次执行send方法
            for self in self_list:
                self.send
            """
    • 断开链接之后去对应的群聊中剔除群成员
    async_to_sync(self.channel_layer.group_discard)(task_id,self.channel_name)

    如何区分不同的任务直接群发消息混乱的情况,针对群号应该做区分

    其实可以直接使用任务的主键值作为群号

    节点数据展示

    节点数据无论是初始化的还是动态修改的,都应该是动态生成的而不是直接写死的

    其次,当一个任务单以及发布之后,应该保存它的发布节点数据

    也就意味着我们需要开设一个模型表用开存储节点相关的所有数据

    class Node(models.Model):
        """存储节点数据"""
        text = models.CharField(verbose_name='节点文字',max_length=32)
        # 一个任务单有多个节点  一对多的关系
        task = models.ForeignKey(verbose_name='发布任务单',to='DeployTask')
    
        status_choices = (
            ('lightgray','待发布'),
            ('green','成功'),
            ('red','失败'),
        )
        status = models.CharField(verbose_name='状态',max_length=32,choices=status_choices,default='lightgray')
    
        # 子节点 父节点  bbs子评论与根评论
        parent = models.ForeignKey(verbose_name='父节点',to='self',null=True,blank=True)
        # 一个服务器可以有多个节点 一对多的关系
        server = models.ForeignKey(verbose_name='服务器',to='Server',null=True,blank=True)

    当用户点击初始化图标的时候,需要操作上述的表查询或者新增记录

    # 先去模型表中创建数据 之后再构造gojs所需要的数据类型返回即可
    # 群发
                node_object_list = []
                # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点)
                start_node = models.Node.objects.create(text='开始',
                                                        task_id=task_id
                                                        )
                node_object_list.append(start_node)
    
                download_node = models.Node.objects.create(text='下载',
                                                           task_id=task_id,
                                                           parent=start_node
                                                           )
                node_object_list.append(download_node)
    
                upload_node = models.Node.objects.create(text='上传',
                                                         task_id=task_id,
                                                         parent=download_node
                                                         )
                node_object_list.append(upload_node)
                # 1.1  服务器相关的节点  一个项目可以有多个服务器
                task_obj = models.DeployTask.objects.filter(pk=task_id).first()
                for server_obj in task_obj.project.servers.all():
                    server_node = models.Node.objects.create(text=server_obj.hostname,
                                                             task_id=task_id,
                                                             parent=upload_node,
                                                             server=server_obj
                                                             )
                    node_object_list.append(server_node)
    
                # 2 再将数据构造成gojs所需的格式返回给前端  [{}]
                node_list = []  # [{},{},{},{}]
                for node_obj in node_object_list:
                    temp_dic = {
                        'key':str(node_obj.pk),
                        'text':node_obj.text,
                        'color':node_obj.status,
                    }
                    # 针对parant字段 需要做判断 再考虑是否创建键值对
                    if node_obj.parent:
                        # 我们用数据的主键值作为key
                        temp_dic['parent'] = str(node_obj.parent_id)
                    node_list.append(temp_dic)
    
                async_to_sync(self.channel_layer.group_send)(task_id,{'type':'my.send','message':{'code':'init','data':node_list}})

    小bug优化

    # 1 当一个任务以及初始化过了 再次点击初始化按钮 数据库不应该再写入数据
    # 先判断当前任务单是否已经初始化过图标数据了
    node_queryset = models.Node.objects.filter(task_id=task_id)
    if not node_queryset:
      # 创建和构造数据的操作
    else:
      node_object_list = node_queryset
    
    # 2 当用户以及给一个任务单初始化过图标之后。打开页面不应该在此点击按钮 而是直接展示出来  在后端建立连接的方法内 直接查询并返回数据
    # 查询当前任务单是否已经初始化过图标  如果有直接查询出来展示到前端  减少用户操作
            node_queryset = models.Node.objects.filter(task_id=task_id)
            if node_queryset:
                node_list = []  # [{},{},{},{}]
                for node_obj in node_queryset:
                    temp_dic = {
                        'key': str(node_obj.pk),
                        'text': node_obj.text,
                        'color': node_obj.status,
                    }
                    # 针对parant字段 需要做判断 再考虑是否创建键值对
                    if node_obj.parent:
                        # 我们用数据的主键值作为key
                        temp_dic['parent'] = str(node_obj.parent_id)
                    node_list.append(temp_dic)
                # 发送数据  单发/群发???   单发
                self.send(text_data=json.dumps({'code':'init','data':node_list}))

    钩子节点展示

    钩子脚本内容思路

    • 直接自己规定死只能写shell脚本或者python脚本

    • 兼容各个类型的脚本(脚本表中再开设一个用来标示脚本类型的字段)

    • 通过文件头来指定

    判断用户是否书写的钩子脚本

    # 判断是否有下载前的钩子脚本
                    if task_obj.before_download_script:
                        # 再创建一个下载前的节点
                        start_node = models.Node.objects.create(text='下载前',
                                                                task_id=task_id,
                                                                parent=start_node
                                                                )
                        node_object_list.append(start_node)
    
    """利用变量名只想的问题来实现箭头指向"""

    代码优化

    对代码进行封装和优化

    # 1 wesocket_recevie方法内的代码过于冗杂  根据功能的不同做优化
    """
    将创建节点数据的代码 和 构造gojs所需要的代码封装成两个函数 
    """
    def create_node(task_id,task_obj):
        """创建节点数据"""
        node_object_list = []
        # 先判断当前任务单是否已经初始化过图标数据了
        node_queryset = models.Node.objects.filter(task_id=task_id)
        if not node_queryset:
            # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点)
            start_node = models.Node.objects.create(text='开始',
                                                    task_id=task_id
                                                    )
            node_object_list.append(start_node)
    
            # 判断是否有下载前的钩子脚本
            if task_obj.before_download_script:
                # 再创建一个下载前的节点
                start_node = models.Node.objects.create(text='下载前',
                                                        task_id=task_id,
                                                        parent=start_node
                                                        )
                node_object_list.append(start_node)
    
            download_node = models.Node.objects.create(text='下载',
                                                       task_id=task_id,
                                                       parent=start_node
                                                       )
            node_object_list.append(download_node)
    
            # 判断是否有下载后的钩子脚本
            if task_obj.after_download_script:
                # 再创建一个下载后的节点
                download_node = models.Node.objects.create(text='下载后',
                                                           task_id=task_id,
                                                           parent=download_node
                                                           )
                node_object_list.append(download_node)
    
            upload_node = models.Node.objects.create(text='上传',
                                                     task_id=task_id,
                                                     parent=download_node
                                                     )
            node_object_list.append(upload_node)
            # 1.1  服务器相关的节点  一个项目可以有多个服务器
            task_obj = models.DeployTask.objects.filter(pk=task_id).first()
            for server_obj in task_obj.project.servers.all():
                server_node = models.Node.objects.create(text=server_obj.hostname,
                                                         task_id=task_id,
                                                         parent=upload_node,
                                                         server=server_obj
                                                         )
                node_object_list.append(server_node)
    
                # 判断是否有发布前的钩子
                if task_obj.before_deploy_script:
                    # 再创建一个下载后的节点
                    server_node = models.Node.objects.create(text='发布前',
                                                             task_id=task_id,
                                                             parent=server_node,
                                                             server=server_obj
                                                             )
                    node_object_list.append(server_node)
    
                # 先再创建一个发布节点
                deploy_node = models.Node.objects.create(text='发布',
                                                         task_id=task_id,
                                                         parent=server_node,
                                                         server=server_obj
                                                         )
                node_object_list.append(deploy_node)
    
                # 判断是否有发布后的钩子
                if task_obj.after_deploy_script:
                    # 再创建一个下载后的节点
                    after_deploy_node = models.Node.objects.create(text='发布后',
                                                                   task_id=task_id,
                                                                   parent=deploy_node,
                                                                   server=server_obj
                                                                   )
                    node_object_list.append(after_deploy_node)
        else:
            node_object_list = node_queryset
        return node_object_list
    
    def convert_data_to_gojs(node_object_list):
        """将数据转化成gojs所需要的类型"""
        # 2 再将数据构造成gojs所需的格式返回给前端  [{}]
        node_list = []  # [{},{},{},{}]
        for node_obj in node_object_list:
            temp_dic = {
                'key': str(node_obj.pk),
                'text': node_obj.text,
                'color': node_obj.status,
            }
            # 针对parant字段 需要做判断 再考虑是否创建键值对
            if node_obj.parent:
                # 我们用数据的主键值作为key
                temp_dic['parent'] = str(node_obj.parent_id)
            node_list.append(temp_dic)
        return node_list

    强调:有点一定要是先写好,之后再去优化,不要同步进行

    节点动态变化

    节点背后对应的操作,无外乎就是下载代码(gitpython模块的运用)

    其次就是将下载好的代码上传到远程服务器上(paramiko模块的运用)

    # 给发布按钮绑定事件 点击触发节点背后对应的所有的操作

    consumers.py代码示例

    from channels.generic.websocket import WebsocketConsumer
    from channels.exceptions import StopConsumer
    import json
    from asgiref.sync import async_to_sync
    from app01 import models
    
    
    def create_node(task_id,task_obj):
        """创建节点数据"""
        node_object_list = []
        # 先判断当前任务单是否已经初始化过图标数据了
        node_queryset = models.Node.objects.filter(task_id=task_id)
        if not node_queryset:
            # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点)
            start_node = models.Node.objects.create(text='开始',task_id=task_id)
            node_object_list.append(start_node)
    
            # 判断是否有下载前的钩子脚本
            if task_obj.before_download_script:
                # 再创建一个下载前的节点
                start_node = models.Node.objects.create(text='下载前',task_id=task_id,parent=start_node)
                node_object_list.append(start_node)
    
            download_node = models.Node.objects.create(text='下载',task_id=task_id,parent=start_node)
            node_object_list.append(download_node)
    
            # 判断是否有下载后的钩子脚本
            if task_obj.after_download_script:
                # 再创建一个下载后的节点
                download_node = models.Node.objects.create(text='下载后',task_id=task_id,parent=download_node)
                node_object_list.append(download_node)
    
            upload_node = models.Node.objects.create(text='上传',task_id=task_id,parent=download_node)
            node_object_list.append(upload_node)
            # 1.1  服务器相关的节点  一个项目可以有多个服务器
            task_obj = models.DeployTask.objects.filter(pk=task_id).first()
            for server_obj in task_obj.project.servers.all():
                server_node = models.Node.objects.create(text=server_obj.hostname,task_id=task_id,
                                                         parent=upload_node,server=server_obj)
                node_object_list.append(server_node)
    
                # 判断是否有发布前的钩子
                if task_obj.before_deploy_script:
                    # 再创建一个下载后的节点
                    server_node = models.Node.objects.create(text='发布前',task_id=task_id,
                                                             parent=server_node,server=server_obj)
                    node_object_list.append(server_node)
    
                # 先再创建一个发布节点
                deploy_node = models.Node.objects.create(text='发布',task_id=task_id,
                                                         parent=server_node,server=server_obj)
                node_object_list.append(deploy_node)
    
                # 判断是否有发布后的钩子
                if task_obj.after_deploy_script:
                    # 再创建一个下载后的节点
                    after_deploy_node = models.Node.objects.create(text='发布后',task_id=task_id,
                                                                   parent=deploy_node,server=server_obj)
                    node_object_list.append(after_deploy_node)
        else:
            node_object_list = node_queryset
        return node_object_list
    
    
    def convert_data_to_gojs(node_object_list):
        """将数据转化成gojs所需要的类型"""
        # 2 再将数据构造成gojs所需的格式返回给前端  [{}]
        node_list = []  # [{},{},{},{}]
        for node_obj in node_object_list:
            temp_dic = {
                'key': str(node_obj.pk),
                'text': node_obj.text,
                'color': node_obj.status,
            }
            # 针对parant字段 需要做判断 再考虑是否创建键值对
            if node_obj.parent:
                # 我们用数据的主键值作为key
                temp_dic['parent'] = str(node_obj.parent_id)
            node_list.append(temp_dic)
        return node_list
    
    
    class PublishConsumer(WebsocketConsumer):
        def websocket_connect(self, message):
            self.accept()
            # 获取url中有名分组的参数  self.scope是一个大字典 里面包含了前端所有的数据
            task_id = self.scope['url_route']['kwargs'].get('task_id')
            # 应该将该用户添加到对应的群聊中  固定写法
            async_to_sync(self.channel_layer.group_add)(task_id,self.channel_name)
            # 第一个参数是群号 必须是字符串格式
            # 第二个参数类似于群成员的唯一标示
    
            # 查询当前任务单是否已经初始化过图标  如果有直接查询出来展示到前端  减少用户操作
            node_queryset = models.Node.objects.filter(task_id=task_id)
            if node_queryset:
                node_list = convert_data_to_gojs(node_queryset)
                # 发送数据  单发/群发???   单发
                self.send(text_data=json.dumps({'code':'init','data':node_list}))
    
    
        def websocket_receive(self, message):
            task_id = self.scope['url_route']['kwargs'].get('task_id')
            task_obj = models.DeployTask.objects.filter(pk=task_id).first()
            text = message.get('text')
            # 由于请求图标的数据 可以分为两部分 第一部分是初始化 第二部分是动态变化
            # 我们对text做判断 来区分到底需要哪部分数据
            if text == 'init':
                # node_list = [
                #         {"key": "start", "text": '开始', "figure": 'Ellipse', "color": "lightgreen"},
                #         {"key": "download", "parent": 'start', "text": '下载代码', "color": "lightgreen", "link_text": '执行中...'},
                #         {"key": "compile", "parent": 'download', "text": '本地编译', "color": "lightgreen"},
                #         ]
                # 要对数据进行序列化处理  单发
                # self.send(text_data=json.dumps({"code":'init',"data":node_list}))
                # 一个操作节点模型表完成数据展示
                # 群发
                # 1.创建节点数据
                node_object_list = create_node(task_id,task_obj)
                # 2.构造gojs所需要的数据类型
                node_list = convert_data_to_gojs(node_object_list)
                async_to_sync(self.channel_layer.group_send)(task_id,{'type':'my.send','message':{'code':'init','data':node_list}})
                """
                后面字典的键是固定的 就叫type和message
                type后面指定的值就是负责发送消息的方法(将message后面的数据交由type后面指定的方法发送给对应的群聊中)
                针对type后面的方法名 有一个固定的变化格式
                my.send     >>>    my_send
                xxx.ooo     >>>    xxx_ooo
                """
            if text == 'deploy':
                """
                1 先默认让所有的节点执行成功         
                2 再真正的执行命令操作
                
                第一步
                    开始节点 开始节点无需任何操作 直接成功即可 
                第二步
                    下载前 执行本地脚本 执行成功或者失败
                第三步
                    下载 利用gitpython做操作
                第四步
                    下载后 执行本地脚本 执行成功或者失败
                第五步
                    上传  paramiko模块
                第六步
                    链接每天服务器
                    上传代码
                    发布前钩子
                    发布
                    发布后钩子
                """
    
        def my_send(self,event):
            message = event.get('message')  # {'code':'init','data':node_list}
            # 发送数据
            self.send(json.dumps(message))
            """
            内部原理就类似于是循环当前群组里面所有的链接对象 然后依次执行send方法
            for self in self_list:
                self.send
            """
    
        def websocket_disconnect(self, message):
            task_id = self.scope['url_route']['kwargs'].get('task_id')
            async_to_sync(self.channel_layer.group_discard)(task_id,self.channel_name)
            raise StopConsumer()

    deploy.html代码示例

    {% extends 'base.html' %}
    {% load staticfiles %}
    
    {% block content %}
        <h1>发布任务</h1>
    {#    1 操作按钮区域#}
        <div style="margin: 10px 0;">
        <button class="btn btn-primary" onclick="initDiagram()">初始化图标</button>
        <button  class="btn btn-primary" onclick="releaseTask()">发布任务</button>
        </div>
    {#    2 基本信息展示区#}
        <table class="table table-hover table-striped">
            <tbody>
            <tr>
                <td>项目名称:{{ project_obj.title }}</td>
                <td>环境:{{ project_obj.get_env_display }}</td>
            </tr>
            <tr>
                <td>版本:{{ task_obj.tag }}</td>
                <td>状态:{{ task_obj.get_status_display }}</td>
            </tr>
            <tr>
                <td colspan="2">仓库地址:{{ project_obj.repo }}</td>
            </tr>
            <tr>
                <td colspan="2">线上路径:{{ project_obj.path }}</td>
            </tr>
            <tr>
                <td colspan="2">
                    <div>关联服务器</div>
                    <ul>
                        {% for server_obj in project_obj.servers.all %}
                            <li>{{ server_obj.hostname }}</li>
                        {% endfor %}
                    </ul>
                </td>
            </tr>
            </tbody>
        </table>
    {#    3 图形动态展示区#}
        <div id="diagramDiv" style="100%; min-height:450px; background-color: #DAE4E4;"></div>
    {% endblock %}
    
    
    {% block js %}
        <script src="{% static 'js/go.js' %}"></script>
        <script>
            var ws;
            var diagram;
            function initWebSocket() {
                ws = new WebSocket('ws://127.0.0.1:8000/publish/{{ task_obj.pk }}/');
    
                ws.onmessage = function (event) {
                    {#console.log(typeof  event.data)  // {'code':'','data':''}#}
                    // 容易出错
                    var res = JSON.parse(event.data)
                    if (res.code==='init'){
                        diagram.model = new go.TreeModel(res.data);
                    }
                }
            }
            function initTable() {
            var $ = go.GraphObject.make;
            diagram = $(go.Diagram, "diagramDiv",{
                layout: $(go.TreeLayout, {
                    angle: 0,
                    nodeSpacing: 20,
                    layerSpacing: 70
                })
            });
            // 创建一个节点模版
            diagram.nodeTemplate = $(go.Node, "Auto",
                $(go.Shape, {
                    figure: "RoundedRectangle",
                    fill: 'yellow',
                    stroke: 'yellow'
                }, new go.Binding("figure", "figure"), new go.Binding("fill", "color"), new go.Binding("stroke", "color")),
                $(go.TextBlock, {margin: 8}, new go.Binding("text", "text"))
            );
            // 创建一个箭头模版
            diagram.linkTemplate = $(go.Link,
                {routing: go.Link.Orthogonal},
                $(go.Shape, {stroke: 'yellow'}, new go.Binding('stroke', 'link_color')),
                $(go.Shape, {toArrow: "OpenTriangle", stroke: 'yellow'}, new go.Binding('stroke', 'link_color'))
            );
            // 这里的数据后期就可以通过后端来获取
            {#var nodeDataArray = [#}
            {#    {key: "start", text: '开始', figure: 'Ellipse', color: "lightgreen"},#}
            {#    {key: "download", parent: 'start', text: '下载代码', color: "lightgreen", link_text: '执行中...'},#}
            {#    {key: "compile", parent: 'download', text: '本地编译', color: "lightgreen"},#}
            {#    {key: "zip", parent: 'compile', text: '打包', color: "red", link_color: 'red'},#}
            {#    {key: "c1", text: '服务器1', parent: "zip"},#}
            {#    {key: "c11", text: '服务重启', parent: "c1"},#}
            {#    {key: "c2", text: '服务器2', parent: "zip"},#}
            {#    {key: "c21", text: '服务重启', parent: "c2"},#}
            {#    {key: "c3", text: '服务器3', parent: "zip"},#}
            {#    {key: "c31", text: '服务重启', parent: "c3"}#}
            {#];#}
            {#diagram.model = new go.TreeModel(nodeDataArray);#}
    
            // 动态控制节点颜色变化
            //var node = diagram.model.findNodeDataForKey("zip");
            // diagram.model.setDataProperty(node, "color", "lightgreen");
        }
            $(function () {
                //  页面加载完毕 直接先初始化websocket对象和图标对象
                initWebSocket()
                initTable()
            })
    
            function initDiagram() {
                // 朝后端要初始化图标的数据
                ws.send('init')
            }
            function releaseTask() {
                // 朝后端发送执行任务的命令
                ws.send('deploy')
            }
        </script>
    {% endblock %}
  • 相关阅读:
    验证foreach 能否操做更改原表
    asp.net post/get 公共方法
    C# json日期转换
    学数学
    2742: [HEOI2012]Akai的数学作业
    BZOJ2208
    树状数组求逆序对
    网络流复习计划
    SG函数学(hua)习(shui)记录
    SPLAY板子
  • 原文地址:https://www.cnblogs.com/ludingchao/p/12723000.html
Copyright © 2011-2022 走看看