zoukankan      html  css  js  c++  java
  • kubernetes api

    #!/usr/bin/env python
    # -*- coding:utf8 -*-
    # __author__ = '北方姆Q'
    
    
    import time
    from kubernetes import client, config
    from kubernetes.stream import stream
    
    
    class KubernetesFactory(object):
        def __init__(self, config_name):
            conf = config.load_kube_config(config_name)
            self.CoreV1Api = client.CoreV1Api(api_client=conf)
            self.AppsV1Api = client.AppsV1Api(api_client=conf)
    
        def list_deployment(self, namespace, **kwargs):
            """
            列出某命名空间下所有deploy
            :param namespace: 命名空间名称
            :param kwargs:
            :return:
            """
            return self.AppsV1Api.list_namespaced_deployment(namespace, **kwargs)
    
        def get_deployment(self, name, namespace, **kwargs):
            """
            获取某deploy的配置
            :param name: deploy名称
            :param namespace: 命名空间名称
            :param kwargs:
            :return:
            """
            return self.AppsV1Api.read_namespaced_deployment(name, namespace, **kwargs)
    
        def update_deployment(self, name, namespace, new_body, **kwargs):
            """
            更新某deploy的配置
            :param name: deploy名称
            :param namespace: 命名空间名称
            :param new_body: 新配置
            :param kwargs:
            :return:
            """
            return self.AppsV1Api.replace_namespaced_deployment(name, namespace, new_body, **kwargs)
    
        def check_update_deployment(self, name, namespace):
            """
            验证某deploy是否能滚动更新成功
            :param name: deploy名称
            :param namespace: 命名空间名称
            :return:
            """
            time_out = 0
            while time_out <= 360:
                time.sleep(20)
                time_out += 20
    
                data = self.AppsV1Api.read_namespaced_deployment_status(name, namespace)
                replicas = data.status.replicas if data.status.replicas else 0
                ready_replicas = data.status.ready_replicas if data.status.ready_replicas else 0
                updated_replicas = data.status.updated_replicas if data.status.updated_replicas else 0
                # 总副本数减已经运行的副本数小于已经更新完成的副本数,说明可更新成功
                if replicas - ready_replicas < updated_replicas:
                    return True
            else:
                return False
    
        def set_image(self, name, namespace, update_image, **kwargs):
            """
            更新某deploy镜像
            :param name: deploy名称
            :param namespace: 命名空间名称
            :param update_image: 新镜像名称
            :param kwargs:
            :return:
            """
            data = self.get_deployment(name, namespace, **kwargs)
            data.spec.template.spec.containers[0].image = update_image
            return self.update_deployment(name, namespace, data, **kwargs)
    
        def get_pod(self, namespace, **kwargs):
            """
            列出某命名空间下所有pod
            :param namespace: 命名空间名称
            :param kwargs:
            :return:
            """
            return self.CoreV1Api.list_namespaced_pod(namespace, **kwargs)
    
        def get_pod_log(self, name, namespace, **kwargs):
            """
            获取某pod的日志
            :param name: pod名称
            :param namespace: 命名空间名称
            :param kwargs:
            :return:
            """
            return self.CoreV1Api.read_namespaced_pod_log(name, namespace, **kwargs)
    
        def exec(self, name, namespace, **kwargs):
            return stream(self.CoreV1Api.connect_get_namespaced_pod_exec, name, namespace, **kwargs)
  • 相关阅读:
    UVA 125 Numbering Paths
    UVA 515 King
    UVA 558 Wormholes
    UVA 10801 Lift Hopping
    UVA 10896 Sending Email
    SGU 488 Dales and Hills
    HDU 3397 Sequence operation
    数据库管理工具navicat的使用
    javascript封装animate动画
    关于webpack没有全局安装下的启动命令
  • 原文地址:https://www.cnblogs.com/bfmq/p/14503896.html
Copyright © 2011-2022 走看看