zoukankan      html  css  js  c++  java
  • Python Django撸个WebSSH操作Kubernetes Pod

    优秀的系统都是根据反馈逐渐完善出来的

    上篇文章介绍了我们为了应对安全和多分支频繁测试的问题而开发了一套Alodi系统,Alodi可以通过一个按钮快速构建一套测试环境,生成一个临时访问地址,详细信息可以看这一篇文章:Alodi:为了保密我开发了一个系统

    系统上线后,SSH登陆控制台成了一个迫切的需求,Kubernetes的Dashboard控制台虽然有WebSSH的功能,但却没办法跟Alodi系统相结合,决定在Alodi中集成WebSSH的功能,先来看看最后实现的效果吧

    涉及技术

    • Kubernetes Stream:接收数据执行,提供实时返回数据流
    • Django Channels:维持长连接,接收前端数据转给Kubernetes,同时将Kubernetes返回的数据发送给前端
    • xterm.js:一个前端终端组件,用于模拟Terminal的界面显示

    基本的数据流向是:用户 --> xterm.js --> django channels --> kubernetes stream,接下来看看具体的代码实现

    Kubernetes Stream

    Kubernetes本身提供了stream方法来实现exec的功能,返回的就是一个WebSocket可以使用的数据流,使用起来也非常方便,代码如下:

    from kubernetes import client, config
    from kubernetes.stream import stream
    
    class KubeApi:
        def __init__(self, namespace='alodi'):
            config.load_kube_config("/ops/coffee/kubeconfig.yaml")
    
            self.namespace = namespace
    
        def pod_exec(self, pod, container=""):
            api_instance = client.CoreV1Api()
    
            exec_command = [
                "/bin/sh",
                "-c",
                'TERM=xterm-256color; export TERM; [ -x /bin/bash ] '
                '&& ([ -x /usr/bin/script ] '
                '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) '
                '|| exec /bin/sh']
    
            cont_stream = stream(api_instance.connect_get_namespaced_pod_exec,
                                 name=pod,
                                 namespace=self.namespace,
                                 container=container,
                                 command=exec_command,
                                 stderr=True, stdin=True,
                                 stdout=True, tty=True,
                                 _preload_content=False
                                 )
    
            return cont_stream
    

    这里的pod name可以通过list_namespaced_pod方法获取,代码如下:

    def get_deployment_pod(self, RAND):
        api_instance = client.CoreV1Api()
    
        try:
            r = api_instance.list_namespaced_pod(
                namespace=self.namespace,
                label_selector="app=%s" % RAND
            )
    
            return True, r
        except Exception as e:
            return False, 'Get Deployment: ' + str(e)
            
    state, data = self.get_deployment_pod(RAND)
    pod_name = data.items[0].metadata.name
    

    list_namespaced_pod会列出namespace下所有pod的详细信息,这里传了两个参数,第一个namespace是必须的,表示我们要列出pod的namespace,第二个label_selector非必须,表示可以通过设置的标签过滤namespace下的pod,由于我们在创建的时候给每个deployment都添加了唯一的app=RAND的标签,所以这里可以过滤出来我们项目所对应的pod

    一个deployment可能对应多个pod,获取到的data.items包含了所有的pod信息,为一个list列表,可根据需要取到对应pod的name

    Django Channels

    之前有两篇文章详细介绍过Django Channels,不了解的可以先查看:Django使用Channels实现WebSocket--上篇Django使用Channels实现WebSocket--下篇,最重要的两部分代码如下

    routing代码:

    from channels.auth import AuthMiddlewareStack
    from channels.routing import ProtocolTypeRouter, URLRouter
    
    from django.urls import path, re_path
    from medivh.consumers import SSHConsumer
    
    application = ProtocolTypeRouter({
        'websocket': AuthMiddlewareStack(
            URLRouter([
                re_path(r'^pod/(?P<name>w+)', SSHConsumer),
            ])
        ),
    })
    

    正则匹配所有以pod开头的websocket连接,都交由名为SSHConsumer的Consumer处理,Consumer代码如下:

    from channels.generic.websocket import WebsocketConsumer
    from medivh.backends.kube import KubeApi
    from threading import Thread
    
    class K8SStreamThread(Thread):
        def __init__(self, websocket, container_stream):
            Thread.__init__(self)
            self.websocket = websocket
            self.stream = container_stream
    
        def run(self):
            while self.stream.is_open():
                if self.stream.peek_stdout():
                    stdout = self.stream.read_stdout()
                    self.websocket.send(stdout)
    
                if self.stream.peek_stderr():
                    stderr = self.stream.read_stderr()
                    self.websocket.send(stderr)
            else:
                self.websocket.close()
    
    
    class SSHConsumer(WebsocketConsumer):
        def connect(self):
            self.name = self.scope["url_route"]["kwargs"]["name"]
    
            # kube exec
            self.stream = KubeApi().pod_exec(self.name)
            kub_stream = K8SStreamThread(self, self.stream)
            kub_stream.start()
    
            self.accept()
    
        def disconnect(self, close_code):
            self.stream.write_stdin('exit
    ')
    
        def receive(self, text_data):
            self.stream.write_stdin(text_data)
    

    WebSSH可以看作是一个最简单的websocket长连接,每个连接建立后都是独立的,不会跟其他连接共享数据,所以这里不需要用到Group

    当连接建立时通过self.scope获取到url中的name,传给Kubernetes API,同时会新起一个线程不断循环是否有新数据产生,如果有则发送给websocket

    当websocket接收到数据就直接写入Kubernetes API,当websocket关闭则会发送个exit命令给Kubernetes

    前端页面

    前端主要用到了xterm.js,整体代码也比较简单

    <!DOCTYPE html>
    <html>
    <head>
      <meta charset="utf-8">
      <title>Alodi | Pod Web SSH</title>
      <link rel="Shortcut Icon" href="/static/img/favicon.ico">
      
      <link href="/static/plugins/xterm/xterm.css" rel="stylesheet" type="text/css"/>
      <link href="/static/plugins/xterm/addons/fullscreen/fullscreen.css" rel="stylesheet" type="text/css"/>
    </head>
    
    <body>
      <div id="terminal"></div>
    </body>
    
    <script src="/static/plugins/xterm/xterm.js"></script>
    <script src="/static/plugins/xterm/addons/fullscreen/fullscreen.js"></script>
    <script>
      var term = new Terminal({cursorBlink: true});
      term.open(document.getElementById('terminal'));
    
      // xterm fullscreen config
      Terminal.applyAddon(fullscreen);
      term.toggleFullScreen(true);
    
      var socket = new WebSocket(
        'ws://' + window.location.host + '/pod/{{ name }}');
    
      socket.onopen = function () {
        term.on('data', function (data) {
            socket.send(data);
        });
    
        socket.onerror = function (event) {
          console.log('error:' + e);
        };
    
        socket.onmessage = function (event) {
          term.write(event.data);
        };
    
        socket.onclose = function (event) {
          term.write('
    
    x1B[1;3;31msocket is already closed.x1B[0m');
          // term.destroy();
        };
      };
    </script>
    </html>
    

    term.open初始化一个Terminal

    term.on会将输入的内容全部实时的传递给后端

    xterm.js有一个fullscreen的插件,引入之后可以配置fullscreen,否则可能页面只有一部分terminal窗口

    目前仍然遇到一个窗口大小无法调整的问题没有解决,初步判断是后端Kubernetes传回的数据决定的,查询了相关资料,找到kubectl命令可以通过添加COLUMNSLINES的env来设置

    #!/bin/sh
    if [ "$1" = "" ]; then
      echo "Usage: kshell <pod>"
      exit 1
    fi
    COLUMNS=`tput cols`
    LINES=`tput lines`
    TERM=xterm
    kubectl exec -i -t $1 env COLUMNS=$COLUMNS LINES=$LINES TERM=$TERM bash
    

    但Kubernetes Python API的Stream没有找到配置的地方,如果你知道,麻烦告诉我


    扫码关注公众号查看更多实用文章

    相关文章推荐阅读:

  • 相关阅读:
    AFN 控制字符报错(无法解析)
    HTTP POST请求报文格式分析与Java实现文件上传
    xcrun
    Makefile选项CFLAGS,LDFLAGS,LIBS
    Xcode 编译错误
    iOS 11 适配
    机器学习算法之决策树
    Python extend 和 append 的区别
    Python warning_function name should be lowercase
    使用matplotlib绘制导数图像
  • 原文地址:https://www.cnblogs.com/37Y37/p/12564534.html
Copyright © 2011-2022 走看看