zoukankan      html  css  js  c++  java
  • python-grpc

    一:安装

    1. 安装 grpc : pip install grpcio
    2. 安装 grpc tools
      1. pip install grpcio-tools
      2. 包含 protocol buffer 编译器,用于从 .proto 文件生成服务端和客户端代码的插件

    二:编写.proto 文件

    介绍:

    grpc默认使用文件,结构化数据存储格式,适合做数据存储或RPC数据交换格式

    基本使用:

    定义接口
    service DynamicMasker{
        // 动态脱敏接口
        rpc DataMasker(DataMaskerReq) returns(DataMaskerResp) {}
        // 数据识别接口
        rpc DataIdentity(DataIdentityReq) returns(DataIdentityResp) {}
    }
    
    分配表示符:

    每个字段都有唯一的一个数字标识符

    [1,15] :占用一个字节

    [16,2047] :占用两个字节

    数据标识符如上的1,2,按顺序。

    指定字段规则
    1. sigular

    2. repeated: 表示这个字段可以重复任意多次(包括0次)

      message DataMaskerReq{
          repeated FieldData field_data_list = 1;
      }
      
      message FieldData{
          string field_name  = 1;  // 字段名称
          string data_type = 2;  // 脱敏类型
          repeated FieldValues field_values_list = 3;  // 脱敏数据列表
      }
      message FieldValues {
          string field_value = 1;
      }
      

      与 json 格式相对照:

      {
          field_data_list:[
              {
                  'field_name': 'id',
                  'data_type': 'mobile',
                  'field_values_list': [
                      {'field_value': '1'},
                      {'field_value': '2'}
                  ]
              },
              {
                  'field_name': 'name',
                  'data_type': 'mobile',
                  'field_values_list': [
                      {'field_value': '1'},
                      {'field_value': '2'},
                      {'field_value': '3'}
                  ]
              }
          ]
      }
      
    标量数值类型
    .proto 文件类型 python类型
    double float
    float float
    int32 int
    unit32 int/long
    bool bool
    默认值
    • string 默认值为空string
    • bytes 默认值为空bytes
    • bool 默认值为false
    • 数值类型 默认值为0
    • 枚举类型 默认值为第一个定义的枚举值,必须为0

    注意:当定义了一个接口返回字段为 code,code为0时,接口返回的数据中不会出现 code 这个字段。

    枚举
    enum ImportType{
        IMPOERT_TYPE_UNKNOWN = 0;  // 第一个定义的枚举值,必须为0
        IMPOERT_TYPE_APPEND = 1; // 追加
        IMPOERT_TYPE_FULLCOVER = 2; // 全量覆盖
    }
    
    使用其他消息类型
    message Auth{
        string organization_uuid = 1;
        string project_uuid = 2;
        string user_uuid = 3;
        string task_uuid = 4;
    
    }
    
    message RDBMSSqlExportDataReq{
        Auth auth = 1;//权限相关
        string source_uri = 2; //来源数据源
        string sql = 3; //sql语句
    }
    

    完整的 .proto 文件

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "io.grpc.dynamic_masker";
    
    package dynamic_masker;
    
    service DynamicMasker{
        // 动态脱敏接口
        rpc DataMasker(DataMaskerReq) returns(DataMaskerResp) {}
        // 数据识别接口
        rpc DataIdentity(DataIdentityReq) returns(DataIdentityResp) {}
    }
    
    message DataMaskerReq{
        repeated FieldData field_data_list = 1;
    }
    
    message FieldData{
        string field_name  = 1;  // 字段名称
        string masker_algorithm = 2;  // 脱敏类型
        repeated FieldValues field_values_list = 3;  // 脱敏数据列表
    }
    
    message FieldValues {
        string field_value = 1;
    }
    
    message DataMaskerResp{
        int32 code = 1;  // 返回状态码 0 成功 -1失败
        string msg = 2;  // 失败时,返回错误信息
        repeated MaskedData masked_data_list = 4;  // 返回脱敏后的数据
    }
    
    message MaskedResult{
        int32 total_count = 1;  // 脱敏数据总量
        int32 failed_count = 2;  // 脱敏失败数量
    }
    
    message MaskedData{
        string field_name = 1;  //字段名称
        repeated FieldValues field_values_list = 2;  // 脱敏后的数据列表
        MaskedResult masked_result = 3; // 每个字段脱敏数量
    }
    
    message DataIdentityReq{
        repeated DataIdentityData field_data_list = 1;
    }
    
    message DataIdentityResp{
        int32 code = 1;  // 返回状态码 0 成功 -1失败
        string msg = 2;  // 失败时,返回错误信息
        repeated IdentityHitData field_hit_result = 3;
    }
    
    message IdentityHitData{
        string field_name = 1;
        string hit = 2;
        string missed_hit = 3;
    }
    
    message DataIdentityData{
        string field_name  = 1;  // 字段名称
        string data_type = 2;  // 脱敏类型
        repeated FieldValues field_values_list = 3;  // 脱敏数据列表
    }
    

    三、编译 .proto 文件

    编译 .proto 文件时,编译器将生成所选语言的代码

    编译命令

    python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. dynamic_masker.proto
    
    • python -m 把库模块当做脚本运行
    • -I 指定 .proto 文件所在路径,proto 文件目录
    • --python_out=. 编译 .proto 文件所生成的文件路径,这里生成到当前目录
    • --grpc_python_out=. 编译生成处理 grpc 相关的代码的路径,这里生成到当前目录

    编译后生成的文件

    • dynamic_masker_pb2.py 用来和 protobuf 数据进行交互
    • dynamic_masker_pb2_grpc.py 用来和 grpc 进行交互

    四、客户端、服务端代码

    服务端 server.py

    from grpc_lib import dynamic_masker_pb2
    from concurrent import futures
    from grpc_lib import dynamic_masker_pb2_grpc
    from dm_utils.dm_core import masker_text, data_identity
    from dm_utils.dm_algorithm import DMAlg
    import grpc
    import time
    import os
    
    
    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    
    
    class DynamicMasker(dynamic_masker_pb2_grpc.DynamicMaskerServicer):
        def DataMasker(self, request, context):
            algorithm = DMAlg()
            masked_data_list = []
            for field_data in request.field_data_list:
                masked_data = {}
                field_values_list = field_data.field_values_list
                masker_algorithm = field_data.masker_algorithm
                masked_data['field_name'] = field_data.field_name
                masked_data['field_values_list'] = []
                total_count = len(field_values_list)
                if masker_algorithm == 'shuffle':
                    # 调用shuffle功能
                    masked_data['field_values_list'] = algorithm.shuffle(list(field_values_list))
                    masked_data['masked_result'] = {'total_count': total_count, 'failed_count': total_count}
                    masked_data_list.append(masked_data)
                else:
                    failed_count = 0
                    for item in field_values_list:
                        try:
                            masked_value = masker_text(item.field_value, masker_algorithm)
                        except Exception as e:
                            return dynamic_masker_pb2.DataMaskerResp(
                                code=-1,
                                msg=str(e),
                                masked_data_list=masked_data_list
                            )
                        if not masked_value:
                            failed_count += 1
                        else:
                            masked_data['field_values_list'].append({'field_value': masked_value})
                    masked_data['masked_result'] = {'total_count': total_count, 'failed_count': failed_count}
                    masked_data_list.append(masked_data)
    
            return dynamic_masker_pb2.DataMaskerResp(
                code=0,
                msg='success',
                masked_data_list=masked_data_list
            )
    
        def DataIdentity(self, request, context):
            field_hit_result = []
            for field_data in request.field_data_list:
                field_name = field_data.field_name
                data_type = field_data.data_type
                field_values_list = field_data.field_values_list
                identity_data_result = {}
                hit = 0
                missed_hit = 0
                for item in field_values_list:
                    try:
                        result = data_identity(item.field_value, data_type)
                    except Exception as e:
                        return dynamic_masker_pb2.DataMaskerResp(
                            code=-1,
                            msg=str(e),
                            masked_data_list=field_hit_result
                        )
                    if not result:
                        missed_hit += 1
                    else:
                        hit += 1
                identity_data_result['field_name'] = field_name
                identity_data_result['hit'] = str(hit)
                identity_data_result['missed_hit'] = str(missed_hit)
                field_hit_result.append(identity_data_result)
            return dynamic_masker_pb2.DataIdentityResp(
                code=0,
                msg='success',
                field_hit_result=field_hit_result
            )
    
    
    if __name__ == '__main__':
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=int(os.environ['MAX_WORKERS'])))
        dynamic_masker_pb2_grpc.add_DynamicMaskerServicer_to_server(DynamicMasker(), server)
        server.add_insecure_port('0.0.0.0:8000')
        server.start()
        print("sever is opening ,waiting for message...")
        try:
            while True:
                time.sleep(_ONE_DAY_IN_SECONDS)
        except KeyboardInterrupt:
            server.stop(0)
    
    

    客户端 client.py

    from grpc_lib import dynamic_masker_pb2_grpc, dynamic_masker_pb2
    import grpc
    
    
    def run():
        channel = grpc.insecure_channel('127.0.0.1:31003')
        stub = dynamic_masker_pb2_grpc.DynamicMaskerStub(channel)
        response = stub.DataMasker(dynamic_masker_pb2.DataMaskerReq(
            field_data_list=[
                {
                    'field_name': 'id',
                    'masker_algorithm': 'mask',
                    'field_values_list': [
                        {'field_value': '1'},
                        {'field_value': '2'}
                    ]
    
                },
                {
                    'field_name': 'name',
                    'masker_algorithm': 'mask',
                    'field_values_list': [
                        {'field_value': '1'},
                        {'field_value': '2'},
                        {'field_value': '3'}
                    ]
    
                }
            ]
        ))
        print("Client received: ", response)
        
    if __name__ == '__main__':
        run()
    
    
  • 相关阅读:
    人名币转大写
    Http协议与TCP协议简单理解
    unity3d常用属性汇总
    ConcurrentHashMap的key value不能为null,map可以?
    一个线程池中的线程异常了,那么线程池会怎么处理这个线程?
    Dubbo负载均衡算法
    [LeetCode] 240. 搜索二维矩阵 II ☆☆☆(二分查找类似)
    [LeetCode] 74. 搜索二维矩阵 ☆☆☆(二分查找)
    Maven中的dependencyManagement 意义
    深入理解maven构建生命周期和各种plugin插件
  • 原文地址:https://www.cnblogs.com/KbMan/p/12030709.html
Copyright © 2011-2022 走看看