zoukankan      html  css  js  c++  java
  • common

    import datetime
    import json
    import logging
    import logging.config
    import math
    import sys
    from functools import wraps
    from logging.handlers import RotatingFileHandler
    
    import flask
    import kubernetes
    from aliyunsdkcore.client import AcsClient
    from flask import current_app, g, got_request_exception
    from flask_restful import reqparse, Resource, Api, http_status_message
    from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
    from kubernetes.client import AppsV1Api, ExtensionsV1beta1Api
    from kubernetes.client import V1Pod
    from kubernetes.client.rest import ApiException
    from sqlalchemy import text
    from werkzeug.datastructures import Headers
    from werkzeug.exceptions import BadRequest, HTTPException
    
    from utils.email_tool import send_email
    from utils.exceptions import ParameterError
    from utils.response import unauthorized_response
    from utils.short_message_tool import send_sms
    
    
    def generate_auth_token(data, expiration=3600 * 24 * 15):
        s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
        return s.dumps(data).decode('utf-8')
    
    
    def authorization(operation_set: set = None, role_set: set = None, menu_set: set = None,
                      certification_type: str = "competence", without_env: bool = True):
        """
        权限检测
        :param operation_set: 该接口所需要的操作权限集合,多个操作之间是 且
        :param role_set: 该接口所需要的角色要求,多个角色之间是 或
        :param menu_set: 该接口所需要的菜单权限集合
        :param certification_type: 验证类型 competence:操作权限控制   role基于角色控制
        :param without_env: 是否需要与环境关联, 默认不关联,个别操作需要判断环境
        :return:
        """
    
        def login_required(function):
            @wraps(function)
            def wrap(self, *args, **kwargs):
                if 'X-Token' not in flask.request.headers:
                    return unauthorized_response()
                else:
                    # token验证
                    token = flask.request.headers.get('X-Token')
                    s = Serializer(current_app.config['SECRET_KEY'])
                    try:
                        data = s.loads(token.encode("utf-8"))
                    except SignatureExpired:
                        return unauthorized_response(message="认证过期")
                    except BadSignature:
                        return unauthorized_response(message="认证失败")
                    except Exception as e:
                        current_app.logger.error(str(e))
                        return unauthorized_response(message="认证异常")
                    results = self.get_query_results(
                        "select user_name, status from user where user_id=:user_id limit 1;",
                        user_id=data["user_id"],
                        fetch_result=True
                    )
                    if not results:
                        return unauthorized_response(message="用户不存在")
                    if not results[0][1]:
                        return unauthorized_response(message="用户已被禁用")
                    g.user_id = data["user_id"]
                    g.user_name = results[0][0]
                    if not check_competence(
                            self, operation_set=operation_set, role_set=role_set, menu_set=menu_set,
                            certification_type=certification_type, without_env=without_env
                    ):
                        raise ParameterError("权限不足")
                return function(self, *args, **kwargs)
    
            return wrap
    
        return login_required
    
    
    def check_competence(
            current_handler, operation_set: set = None, role_set: set = None, menu_set: set = None,
            certification_type: str = "competence", without_env: bool = True
    ):
        """
        权限检测
        :param current_handler: 当前请求视图实例
        :param operation_set: 该接口所需要的操作权限集合
        :param menu_set: 该接口所需要的菜单权限集合
        :param role_set: 该接口所需要的角色要求
        :param certification_type: 验证类型 competence:操作权限控制   role基于角色控制, menu 菜单控制
        :param without_env: 是否需要与环境关联, 默认不关联,个别操作需要判断环境
        :return:
        """
        # 菜单权限由前端实现,后端无从校验。后端主要做操作权限校验
        if certification_type == "competence":
            if not operation_set:
                return True
            request_env = None
            if not without_env:
                request_env = current_handler.args["env"]
            role_ids = get_user_role_ids(g.user_id)
            operation_keys = get_user_operation_keys(role_ids, request_env)
            return not bool(operation_set.difference(operation_keys))
    
        # 基于角色的权限控制
        elif certification_type == "role":
            if not role_set:
                return True
            role_keys = get_user_role_keys(g.user_id)
            return bool(role_keys & role_set)
        elif certification_type == "menu":
            if not menu_set:
                return True
            role_ids = get_user_role_ids(g.user_id)
            menu_keys = get_user_menu_keys(role_ids)
            return not bool(menu_set.difference(menu_keys))
        else:
            raise ValueError("暂不支持该类型权限认证")
    
    
    def get_user_role_ids(user_id):
        query = f"select role_id from role_user where user_id=:user_id;"
        return [_id[0] for _id in TransactionResource.get_query_results(query, user_id=user_id)]
    
    
    def get_user_role_keys(user_id):
        role_ids = get_user_role_ids(user_id)
        if not role_ids:
            return set()
        sql = """
            select role_key from role where role_id in :role_ids;
        """
        results = TransactionResource.get_query_results(sql, role_ids=role_ids)
        return set(r[0] for r in results)
    
    
    def get_user_operation_keys(role_ids, env=None):
        """获取用户的可操作列表"""
        if not role_ids:
            return set()
        sql = f"""
            select gb_operation.operation_key from role_operation 
            inner join gb_operation on role_operation.operation_id = gb_operation.operation_id 
            where role_operation.role_id in :role_ids
        """
        if env:
            sql += " and gb_operation.env = :env;"
        results = TransactionResource.get_query_results(sql, role_ids=role_ids, env=env)
        operation_keys = {r[0] for r in results}
        return operation_keys
    
    
    def get_user_menu_keys(role_ids):
        if not role_ids:
            return set()
        sql = f"""
                select gb_menu.menu_key from role_menu 
                inner join gb_menu on role_menu.menu_id = gb_menu.menu_id 
                where role_menu.role_id in :role_ids
            """
        results = TransactionResource.get_query_results(sql, role_ids=role_ids)
        menu_keys = {r[0] for r in results}
        return menu_keys
    
    
    def transaction(sql, bindparams=None, commit=True, fetch_result=False, **kwargs):
        """
        @param sql: sql text
        @param bindparams: List 对kwargs中的key进行类型绑定及限制等 like :
            [bindparam('alarm_time_param', type_=DateTime, required=True)])
        @param fetch_result: 是否直接返回结果,如果返回,为列表,否则是一个result对象
        @param kwargs: sql中绑定的参数
        @return:
        """
        session = current_app.db.session
        try:
            results = session.execute(text(sql, bindparams), kwargs)
            if commit:
                session.commit()
        except Exception as e:
            current_app.logger.error(str(e))
            session.rollback()  # 出现异常则 rollback
            raise e
        if fetch_result:
            if results.rowcount:
                return [r for r in results]
            else:
                return None
        else:
            return results
    
    
    def with_transaction(function):
        @wraps(function)
        def wrap(self, *args, **kwargs):
            with current_app.db.engine.connect() as conn:
                trans = conn.begin()  # 开启一个 transaction
                g.trans = trans
                g.conn = conn
                self.trans = trans
                self.conn = conn
                try:
                    resp = function(self, *args, **kwargs)
                    trans.commit()
                    return resp
                except Exception as e:
                    current_app.logger.error(str(e))
                    trans.rollback()  # 出现异常则 rollback
                    raise e
    
        return wrap
    
    
    def do_nothing(value):
        return value
    
    
    def check_args(**kwargs):
        def my_decorator(func):
            @wraps(func)
            def wrapper(self, *_args, **_kwargs):
                parser = reqparse.RequestParser()
                # 添加请求体或查询参数
                for key, value in kwargs.items():
                    if key not in _kwargs:
                        required = False if value.startswith("?") else True
                        ignore = ~required
                        # assert ":" in value, "若字符串以?开始,代表非必须; ':'前为字段类型,后为缺失时的错误提示"
                        # 问号开头代表非必须,第零项为变量名
                        items = value.lstrip("?").split(":")
                        items_count = len(items)
                        _type_str = items[0]
                        # 第一项为错误提示语
                        if items_count >= 2 and items[1]:
                            _help = items[1]
                        else:
                            _help = f"{key}为必须参数"
                        # 第二项为参数所在位置
                        if items_count >= 3 and items[2]:
                            location = tuple(items[2].split(","))
                        else:
                            location = ('json', 'values',)
                        # 第三项为参数实际名称(请求参数的名称)
                        if items_count >= 4 and items[3]:
                            name = items[3]
                            dest = key
                        else:
                            name = key
                            dest = None
                        if _type_str == "list":
                            _type = do_nothing
                            action = "append"
                            default = lambda: []
                        else:
                            _type = eval(_type_str)
                            action = "store"
                            default = None
                        parser.add_argument(
                            name, dest=dest, type=_type, help=_help, required=required, action=action, default=default,
                            ignore=ignore, location=location
                        )
                try:
                    args = parser.parse_args()
                except BadRequest as e:
                    if hasattr(e, "data"):
                        values = e.data.get("message", {}).values()
                        msg = ",".join(values)
                        if not msg:
                            keys = e.data.get("message", {}).keys()
                            msg = ",".join(keys)
                        raise ParameterError(msg)
                    else:
                        raise e
                # 添加url解析出来的参数
                args.update(_kwargs)
                self.args = args
                current_app.logger.debug('请求参数:{}'.format(json.dumps(args, ensure_ascii=False, indent=4)))
                result = func(self, *_args, **_kwargs)
                current_app.logger.debug(
                    '响应参数:{}'.format(json.dumps(result, ensure_ascii=False, indent=4)))
                return result
    
            return wrapper
    
        return my_decorator
    
    
    class QueryMixin:
        select_fields = None
        table = None
    
        @classmethod
        def get_query_results(cls, sql, bindparams=None, fetch_result=False, **kwargs):
            results = current_app.db.session.execute(text(sql, bindparams), kwargs)
            current_app.logger.debug(results.cursor._executed.decode())
            if fetch_result:
                if results.rowcount:
                    return [r for r in results]
                else:
                    return []
            return results
    
        @classmethod
        def get_single_field_query_result(cls, sql, bindparams=None, **kwargs):
            results = current_app.db.session.execute(text(sql, bindparams), kwargs)
            current_app.logger.debug(results.cursor._executed.decode())
            result = None
            for r in results:
                result = r[0]
                break
            return result
    
        @classmethod
        def get_single_field_query_results(cls, sql, bindparams=None, **kwargs):
            results = current_app.db.session.execute(text(sql, bindparams), kwargs)
            current_app.logger.debug(results.cursor._executed.decode())
            return [r[0] for r in results]
    
        @classmethod
        def update_or_insert(cls, sql, bindparams=None, **kwargs):
            results = current_app.db.session.execute(text(sql, bindparams), kwargs)
            cursor = results._saved_cursor or results.cursor
            if cursor:
                current_app.logger.debug(results._saved_cursor._executed.decode())
            return results
    
        @classmethod
        def serialize_results(cls, results, select_fields=None, is_multi=False, rename_map=None):
            if is_multi:
                return cls.serialize_multi_results(results, select_fields=select_fields, rename_map=rename_map)
            else:
                return cls.serialize_single_results(results, select_fields=select_fields, rename_map=rename_map)
    
        @classmethod
        def serialize_single_results(cls, results, select_fields=None, rename_map=None):
            select_fields = select_fields or cls.select_fields
            assert select_fields, "请检查需要序列化的字段"
            data = dict()
            if not rename_map:
                rename_map = dict()
            for result in results:
                for index, field in enumerate(select_fields):
                    if a := rename_map.get(field):
                        field = a
                    else:
                        field = field.split(".")[-1]
                    temp_result = result[index]
                    if isinstance(temp_result, datetime.datetime):
                        data[field] = result[index].strftime('%Y-%m-%d %H:%M:%S')
                    elif isinstance(temp_result, datetime.date):
                        data[field] = result[index].strftime('%Y-%m-%d')
                    else:
                        data[field] = result[index]
            return data
    
        @classmethod
        def serialize_multi_results(cls, results, select_fields=None, rename_map=None):
            select_fields = select_fields or cls.select_fields
            data_list = list()
            if not rename_map:
                rename_map = dict()
            for result in results:
                data = dict()
                for index, field in enumerate(select_fields):
                    if a := rename_map.get(field):
                        field = a
                    else:
                        field = field.split(".")[-1]
                    temp_result = result[index]
                    if isinstance(temp_result, datetime.datetime):
                        data[field] = result[index].strftime('%Y-%m-%d %H:%M:%S')
                    elif isinstance(temp_result, datetime.date):
                        data[field] = result[index].strftime('%Y-%m-%d')
                    else:
                        data[field] = result[index]
                data_list.append(data)
            return data_list
    
        @classmethod
        def get_query_single_result(
                cls, sql, bindparams=None, fetch_result=False, select_fields=None, serialize=True, rename_map=None, **kwargs
        ):
            if serialize:
                fetch_result = False
            results = cls.get_query_results(sql=sql, bindparams=bindparams, fetch_result=fetch_result, **kwargs)
            if serialize:
                return cls.serialize_single_results(results, select_fields=select_fields, rename_map=rename_map)
            else:
                return results
    
        @classmethod
        def get_last_insert_id(cls):
            results = cls.get_query_results("SELECT LAST_INSERT_ID();", fetch_result=True)
            if results:
                return results[0][0]
            else:
                return None
    
        @classmethod
        def group_data_list(cls, data_list: list, key) -> list:
            value_set = {d[key] for d in data_list}
            value_list = sorted(value_set)
            index_map = dict()
            results = list()
            for index, v in enumerate(value_list):
                results.append({key: v, "values": list()})
                index_map[v] = index
            for data in data_list:
                results[index_map[data[key]]]["values"].append(data)
            return results
    
        @classmethod
        def get_limit_sql_suffix(cls, page_size, current_page):
            return f" limit {(current_page - 1) * page_size}, {page_size};"
    
        @classmethod
        def build_query_sql(cls, sql_template: str, select_fields: list = None, page_size: int = 0, current_page=None):
            if current_page is None:
                current_page = 1
            select_fields = select_fields or cls.select_fields
            sql = sql_template.format(fields=",".join(select_fields))
            count_sql = sql_template.format(fields="count(1)") + ";"
            if page_size:
                sql += cls.get_limit_sql_suffix(page_size, current_page)
            else:
                sql += ";"
            return sql, count_sql
    
        @classmethod
        def get_page_info(cls, page_size, current_page, total_count):
            if current_page is None:
                current_page = 1
            if page_size:
                pages = math.ceil(total_count / page_size)
                return {
                    "total": total_count,
                    "size": page_size,
                    "pages": pages,
                    "current": current_page
                }
            else:
                return {
                    "total": total_count,
                    "size": total_count,
                    "pages": 1,
                    "current": 1
                }
    
        @classmethod
        def get_config_value_by_key(cls, config_key):
            sql = "select config_value from gb_config where config_key=:config_key;"
            config_value = cls.get_single_field_query_result(sql, config_key=config_key)
            return config_value
    
        @staticmethod
        def paginate(collections, page=None, per_page=None) -> tuple:
            """返回第page页的per_page个结果
            返回(items, total)
            """
            if page is None:
                page = 1
            else:
                page = int(page)
            if per_page is None:
                per_page = 20
            else:
                per_page = int(per_page)
            if (total := len(collections)) < (end := page * per_page):
                items = collections[(page - 1) * per_page:]
            else:
                items = collections[(page - 1) * per_page: end]
            return items, total
    
    
    class TransactionResource(Resource, QueryMixin):
    
        def dispatch_request(self, *args, **kwargs):
            try:
                return super(TransactionResource, self).dispatch_request(*args, **kwargs)
            except Exception as e:
                current_app.db.session.rollback()
                current_app.logger.error(str(e))
                raise e
    
    
    # 重写API类,目的是做到正确返回错误码,否则默认是500
    class CustomizeApi(Api):
        def handle_error(self, e):
            """Error handler for the API transforms a raised exception into a Flask
            response, with the appropriate HTTP status code and body.
    
            :param e: the raised Exception object
            :type e: Exception
    
            """
            got_request_exception.send(current_app._get_current_object(), exception=e)
    
            if not isinstance(e, HTTPException) and current_app.propagate_exceptions:
                exc_type, exc_value, tb = sys.exc_info()
                if exc_value is e:
                    raise
                else:
                    raise e
    
            headers = Headers()
            if isinstance(e, HTTPException):
                code = e.code
                default_data = {
                    'message': getattr(e, 'description', http_status_message(code))
                }
                headers = e.get_response().headers
            else:
                code = 500
                default_data = {
                    'message': http_status_message(code),
                }
    
            # Werkzeug exceptions generate a content-length header which is added
            # to the response in addition to the actual content-length header
            # https://github.com/flask-restful/flask-restful/issues/534
            remove_headers = ('Content-Length',)
    
            for header in remove_headers:
                headers.pop(header, None)
    
            data = getattr(e, 'data', default_data)
    
            if code and code >= 500:
                exc_info = sys.exc_info()
                if exc_info[1] is None:
                    exc_info = None
                current_app.log_exception(exc_info)
    
            error_cls_name = type(e).__name__
            if error_cls_name in self.errors:
                custom_data = self.errors.get(error_cls_name, {})
                code = custom_data.get('status', code)
                if e.description:
                    custom_data["message"] = e.description
                data.update(custom_data)
    
            if code == 406 and self.default_mediatype is None:
                # if we are handling NotAcceptable (406), make sure that
                # make_response uses a representation we support as the
                # default mediatype (so that make_response doesn't throw
                # another NotAcceptable error).
                supported_mediatypes = list(self.representations.keys())
                fallback_mediatype = supported_mediatypes[0] if supported_mediatypes else "text/plain"
                resp = self.make_response(
                    data,
                    code,
                    headers,
                    fallback_mediatype=fallback_mediatype
                )
            else:
                resp = self.make_response(data, code, headers)
    
            if code == 401:
                resp = self.unauthorized(resp)
            return resp
    
    
    class K8SBase:
        def __init__(self, app_name, group_name, k8s_url, k8s_token, namespace, zone_code):
            self.k8s_url = k8s_url
            self.k8s_token = k8s_token
            self.config = self.get_config()
            self.core_v1 = self.get_core_v1()
            self.apps_v1 = self.get_apps_v1()
            self.extensions_v1 = self.get_extensions_v1_beta1()
            self.namespace = namespace
            self.zone_code = zone_code
            self.app_name = app_name
            self.group_name = group_name
    
        def get_config(self):
            try:
                configuration = kubernetes.client.Configuration()
                configuration.host = self.k8s_url
                configuration.verify_ssl = False
                configuration.api_key = {"authorization": "Bearer " + self.k8s_token}  # 获取异常并提示用户
                client_config = kubernetes.client.api_client.ApiClient(configuration=configuration)
                return client_config
            except TypeError as e:
                current_app.logger.error(str(e))
                return
    
        def get_core_v1(self):
            core_v1 = kubernetes.client.apis.core_v1_api.CoreV1Api(self.config)
            return core_v1
    
        def get_extensions_v1_beta1(self):
            extensions_v1 = ExtensionsV1beta1Api(self.config)
            return extensions_v1
    
        def get_apps_v1(self):
            apps_v1 = AppsV1Api(self.config)
            return apps_v1
    
    
    class K8SPodInfoTool(K8SBase):
        @classmethod
        def get_pod_restart_count(cls, pod: V1Pod):
            if pod.status.container_statuses:
                for item in pod.status.container_statuses:
                    return item.restart_count
            return 0
    
        @classmethod
        def get_image_version(cls, pod: V1Pod):
            if pod.status.container_statuses:
                for item in pod.status.container_statuses:
                    return item.image.split(":")[1] if item.image and ":" in item.image else None
            return ""
    
        @classmethod
        def get_pod_info(cls, pod):
            conditions = pod.status.conditions
            return {
                "pod_name": pod.metadata.name,
                "pod_ip": pod.status.pod_ip,
                "app_version": cls.get_image_version(pod),
                "phase": pod.status.phase,
                "conditions": ";".join(map(lambda x: f"{x.type}: {x.status}", conditions)) if conditions else "",
                "restart_count": cls.get_pod_restart_count(pod)
            }
    
        def sync_deployment_pods_info(self, match_labels):
            """
            同步正在发布中的 pod 的信息
            :return:
            """
            items = []
            for key, value in match_labels.items():
                items.append(f"{key}={value}")
            labels = ",".join(items)
            results = self.core_v1.list_namespaced_pod(namespace=self.namespace, label_selector=labels, watch=False)
            pod_list = list()
            for pod in results.items:
                pod_info = self.get_pod_info(pod)
                pod_list.append(pod_info)
            return pod_list
    
    
    class K8SCDTool(K8SBase):
    
        def get_deploy_info(self, deploy_name):
            try:
                result = self.apps_v1.read_namespaced_deployment(deploy_name, self.namespace)
            except ApiException as e:
                current_app.logger.error('获取deployment信息失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", e)
                return False, message
            else:
                return True, result
    
        def get_deploy_info_status(self, deploy_name):
            try:
                result = self.apps_v1.read_namespaced_deployment_status(deploy_name, self.namespace)
            except ApiException as e:
                current_app.logger.error('获取deployment信息失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", e)
                return False, message
            else:
                return True, result
    
        def get_pod_status(self, k8s_pod_tool, match_label):
            try:
                pod_list = k8s_pod_tool.sync_deployment_pods_info(match_label)
                pods_status = {}
                for pod_info in pod_list:
                    pod_name = pod_info.get('pod_name')
                    result = self.core_v1.read_namespaced_pod_status(pod_name, self.namespace)
                    pods_status[pod_name] = result
            except ApiException as e:
                current_app.logger.error('获取Pod信息失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", e)
                return False, message
            else:
                return True, pods_status
    
        def get_namespace_pod(self, deploy_name):
            try:
                result = self.core_v1.read_namespaced_pod(deploy_name, self.namespace)
            except ApiException as e:
                current_app.logger.error('获取deployment信息失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", e)
                return False, message
            else:
                return True, result
    
        def update_deploy(self, name, body):
            try:
                result = self.apps_v1.replace_namespaced_deployment(name, self.namespace, body)
                return 3, result  # 3 为发布中
            except ApiException as e:
                current_app.logger.error('更新Deployment失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", "")
                if e.status == 422:
                    if "MatchLabels" in message and "field is immutable" in message:
                        return 2, "检测到应用名称发生改变,需删除Deployment后重新发布"
                    else:
                        return 2, message
                else:
                    return 2, e  # 2 为发布失败
    
        def patch_deploy(self, name, patch_body):
            try:
                result = self.apps_v1.patch_namespaced_deployment(name, self.namespace, patch_body)
                return 3, result  # 3 为发布中
            except ApiException as e:
                current_app.logger.error('更新Deployment失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", e)
                return 2, message  # 2 为发布失败
    
        def delete_deploy(self, name):
            try:
                result = self.apps_v1.delete_namespaced_deployment(name, self.namespace)
                return 3, result  # 3 为发布中
            except ApiException as e:
                current_app.logger.error('更新Deployment失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", e)
                return 2, message  # 2 为发布失败
    
        def create_deploy(self, body):
            """创建deploy"""
            try:
                result = self.apps_v1.create_namespaced_deployment(self.namespace, body)
                return 3, result  # 3 为发布中
            except ApiException as e:
                current_app.logger.error('发布失败:{}'.format(e))
                error = json.loads(e.body)
                message = error.get("message", e)
                return 2, message  # 2 为发布失败
    
        def add_pod_label(self, label, pod_name_list):
            """为pod添加标签"""
            try:
                result = self.core_v1.list_pod_for_all_namespaces(label_selector=label)
                index = 0
                for pod_info in result.items:
                    pod_name = pod_info.metadata.name
                    pod_res = self.core_v1.read_namespaced_pod(pod_name, self.namespace)
                    pod_res.metadata.labels['name'] = pod_name_list[index][0]
                    self.core_v1.patch_namespaced_pod(pod_name, self.namespace, pod_res)
                    index += 1
            except ApiException as e:
                current_app.logger.error('添加名称标签异常:{}'.format(e))
    
        def get_service_info(self, service_name):
            try:
                result = self.core_v1.read_namespaced_service(service_name, self.namespace)
            except ApiException as e:
                return 2, e
            else:
                return 1, result
    
        def create_service(self, body):
            """创建 service"""
            try:
                result = self.core_v1.create_namespaced_service(self.namespace, body)
                return 1, result
            except Exception as e:
                current_app.logger.error('创建 service 失败:{}'.format(e))
                return 2, e
    
        def update_service(self, service_name, body):
            """ 更新 service, patch为补丁方式更新,默认为merge,需要指定replace, 参照:http://jsonpatch.com/ """
            try:
                result = self.core_v1.patch_namespaced_service(service_name, self.namespace, body)
                return 1, result
            except Exception as e:
                current_app.logger.error('更新 service 失败:{}'.format(e))
                return 2, e
    
        def delete_service(self, service_name):
            """ 删除 service """
            try:
                result = self.core_v1.delete_namespaced_service(service_name, self.namespace)
                return 1, result
            except Exception as e:
                current_app.logger.error('删除 service 失败:{}'.format(e))
                return 2, e
    
        def create_ingress(self, body):
            """ 创建 ingress 规则"""
            try:
                result = self.extensions_v1.create_namespaced_ingress(self.namespace, body)
                return 1, result
            except Exception as e:
                current_app.logger.error('创建 service 失败:{}'.format(e))
                return 2, e
    
        def update_ingress(self, ingress_name, body):
            """ 更新 ingress 规则"""
            try:
                result = self.extensions_v1.patch_namespaced_ingress(ingress_name, self.namespace, body)
                return 1, result
            except Exception as e:
                current_app.logger.error('更新 ingress 失败:{}'.format(e))
                return 2, e
    
        def delete_ingress(self, ingress_name):
            """ 删除 ingress 规则"""
            try:
                result = self.extensions_v1.delete_namespaced_ingress(ingress_name, self.namespace)
                return 1, result
            except Exception as e:
                current_app.logger.error('删除 ingress 失败:{}'.format(e))
                return 2, e
    
        def check_service_exists(self, service_name):
            try:
                result = self.core_v1.read_namespaced_service(service_name, self.namespace)
            except ApiException as e:
                if e.status == 404:
                    return False
            else:
                return result
    
        def check_deployment_exists(self, deployment_name):
            try:
                self.apps_v1.read_namespaced_deployment(deployment_name, self.namespace)
            except ApiException as e:
                if e.status == 404:
                    return False
            else:
                return True
    
        def check_ingress_exists(self, ingress_name):
            try:
                self.extensions_v1.read_namespaced_ingress(ingress_name, self.namespace)
            except ApiException as e:
                if e.status == 404:
                    return False
            else:
                return True
    
        def list_namespaced_persistent_volume_claim(self):
            try:
                result = self.core_v1.list_namespaced_persistent_volume_claim(self.namespace)
                return True, result
            except ApiException as e:
                error = json.loads(e.body)
                message = error.get("message", e)
                current_app.logger.error('获取存储声明失败:{}'.format(message))
                return False, message
    
        def list_namespaced_config_map(self):
            try:
                result = self.core_v1.list_namespaced_config_map(self.namespace)
                return True, result
            except ApiException as e:
                error = json.loads(e.body)
                message = error.get("message", e)
                current_app.logger.error('获取 config map 失败:{}'.format(message))
                return False, message
    
        def list_namespaced_secret_map(self):
            try:
                result = self.core_v1.list_namespaced_secret(self.namespace)
                return True, result
            except ApiException as e:
                error = json.loads(e.body)
                message = error.get("message", e)
                current_app.logger.error('获取 config map 失败:{}'.format(message))
                return False, message
    
    
    def get_k8s_cd_tool(app_name, group_name, system_code, env, namespace, zone_code) -> K8SCDTool:
        return get_k8s_tool(app_name, group_name, system_code, env, namespace, zone_code, K8SCDTool)
    
    
    def get_k8s_info_tool(app_name, group_name, system_code, env, namespace, zone_code) -> K8SPodInfoTool:
        return get_k8s_tool(app_name, group_name, system_code, env, namespace, zone_code, K8SPodInfoTool)
    
    
    def get_k8s_tool(app_name, group_name, system_code, env, namespace, zone_code, k8s_class):
        url_key = f"K8S_{system_code}_{env}_{zone_code}_URL"
        try:
            k8s_url = TransactionResource.get_config_value_by_key(url_key)
            if not k8s_url:
                raise ValueError(f'{url_key} 未配置')
            token_key = f"K8S_{system_code}_{env}_{zone_code}_TOKEN"
            k8s_token = TransactionResource.get_config_value_by_key(token_key)
            cd_obj = k8s_class(app_name, group_name, k8s_url, k8s_token, namespace, zone_code)
            return cd_obj
        except ApiException as e:
            current_app.logger.error(str(e))
            raise ParameterError(f'{url_key} 未配置')
    
    
    def setup_log(config_obj, app):
        # 设置日志的记录等级
        logging.basicConfig(level=config_obj.LOG_LEVEL)
        # 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
        file_log_handler = RotatingFileHandler(config_obj.LOG_FILE, maxBytes=1024 * 1024 * 100, backupCount=10)
        # 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
        formatter = logging.Formatter(
            "[%(asctime)s][%(pathname)s][%(filename)s][%(lineno)d][%(levelname)s][%(thread)d][%(module)s][%(funcName)s][%(message)s]")
        # 为刚创建的日志记录器设置日志记录格式
        file_log_handler.setFormatter(formatter)
        # 为全局的日志工具对象(flask app使用的)添加日志记录器
        logging.getLogger().addHandler(file_log_handler)
        # 设置flask自带的app.logger级别
        app.logger.setLevel(config_obj.LOG_LEVEL)
    
    
    # 用户操作日志
    class OperationLog(object):
    
        def __init__(self, user_id, operation_key, page, result, env, created_time, content
                     ):
            self.operation_key = operation_key
            self.created_time = created_time
            self.user_id = user_id
            self.page = page
            self.result = result
            self.env = env
            self.content = content
    
        def set_log(self, bindparams=None):
            params = {
                "operation_key": self.operation_key,
                "created_time": self.created_time,
                "user_id": self.user_id,
                "page": self.page,
                "result": self.result,
                'env': self.env,
                'content': self.content,
            }
            values_str = ','.join(map(lambda x: f':{x}', params.keys()))
            sql = f"insert into gb_operation_log ({','.join(params.keys())}) values ({values_str});"
            results = current_app.db.session.execute(text(sql, bindparams), params)
            return results
    
    
    class OperationNotify(object):
    
        def __init__(self, operation_key, content, template_code=None, params=None):
            self.operation_key = operation_key
            self.text = content
            self.template_code = template_code
            self.params = params
    
        def get_notify_user_list(self):
            sql = f'select user_id from gb_notify where operation_key=:operation_key'
            results = QueryMixin.get_single_field_query_results(sql, operation_key=self.operation_key)
            return results
    
        def get_recipients_email(self):
            user_list = self.get_notify_user_list()
            query_email_sql = "select email from user where user_id in :user_list; "
            results_email = QueryMixin.get_single_field_query_results(query_email_sql, user_list=user_list,
                                                                      fetch_result=True)
            return results_email
    
        def get_recipients_phone(self):
            user_list = self.get_notify_user_list()
            query_sms_sql = "select phone from user where user_id in :user_list; "
            results_sms = QueryMixin.get_single_field_query_results(query_sms_sql, user_list=user_list, fetch_result=True)
            return results_sms
    
        def send_email(self):
            sql = f'select distinct is_notify_email from `gb_notify` where operation_key=:operation_key'
            is_notify = QueryMixin.get_single_field_query_result(sql, operation_key=self.operation_key)
            if not is_notify:
                return
            mail_body = self.text
            recipients = self.get_recipients_email()
            send_email("Genbu通知", recipients, mail_body)
    
        def send_sms(self):
            sql = f'select distinct is_notify_sms from `gb_notify` where operation_key=:operation_key'
            is_notify = QueryMixin.get_single_field_query_result(sql, operation_key=self.operation_key)
            if not is_notify:
                return
            if not self.template_code or not self.params:
                return
            recipients = self.get_recipients_phone()
            acs_client = AcsClient(current_app.config["ACCESS_KEY_ID"], current_app.config["ACCESS_KEY_SECRET"])
            for phone in recipients:
                result = send_sms(acs_client, phone, self.template_code, template_param=self.params)
                result_obj = json.loads(result)
                if result_obj['Code'] != 'OK':
                    current_app.logger.error(result.decode('utf8'))
                    raise ParameterError("短信发送失败")
                current_app.logger.info(result.decode('utf8'))
    

      

  • 相关阅读:
    Go语言踩过的坑---记录GOPATH在GOLAND中的坑
    反射小例子
    制作pip包
    mac常用软件安装链接
    YCSB压测elasticsearch
    SSO和Auth2.0
    JAVA内部类的四大作用
    修改fastadmin,添加模糊查询
    AH00558: apache2: Could not reliably determine the server's fully qualified domain name, using 172.18.0.2. Set the 'ServerName' directive globally to suppress this message
    taro3.x: 封装实现chat emit on
  • 原文地址:https://www.cnblogs.com/hude/p/13743812.html
Copyright © 2011-2022 走看看