目录
一:安装
- 安装
grpc
: pip install grpcio - 安装 grpc tools
- pip install grpcio-tools
- 包含 protocol buffer 编译器,用于从 .proto 文件生成服务端和客户端代码的插件
二:编写.proto
文件
介绍:
grpc默认使用文件,结构化数据存储格式,适合做数据存储或RPC数据交换格式
基本使用:
定义接口
service DynamicMasker{
// 动态脱敏接口
rpc DataMasker(DataMaskerReq) returns(DataMaskerResp) {}
// 数据识别接口
rpc DataIdentity(DataIdentityReq) returns(DataIdentityResp) {}
}
分配表示符:
每个字段都有唯一的一个数字标识符
[1,15] :占用一个字节
[16,2047] :占用两个字节
数据标识符如上的1,2,按顺序。
指定字段规则
-
sigular
-
repeated: 表示这个字段可以重复任意多次(包括0次)
message DataMaskerReq{ repeated FieldData field_data_list = 1; } message FieldData{ string field_name = 1; // 字段名称 string data_type = 2; // 脱敏类型 repeated FieldValues field_values_list = 3; // 脱敏数据列表 } message FieldValues { string field_value = 1; }
与 json 格式相对照:
{ field_data_list:[ { 'field_name': 'id', 'data_type': 'mobile', 'field_values_list': [ {'field_value': '1'}, {'field_value': '2'} ] }, { 'field_name': 'name', 'data_type': 'mobile', 'field_values_list': [ {'field_value': '1'}, {'field_value': '2'}, {'field_value': '3'} ] } ] }
标量数值类型
.proto 文件类型 | python类型 |
---|---|
double | float |
float | float |
int32 | int |
unit32 | int/long |
bool | bool |
默认值
- string 默认值为空string
- bytes 默认值为空bytes
- bool 默认值为false
- 数值类型 默认值为0
- 枚举类型 默认值为第一个定义的枚举值,必须为0
注意:当定义了一个接口返回字段为 code,code为0时,接口返回的数据中不会出现 code 这个字段。
枚举
enum ImportType{
IMPOERT_TYPE_UNKNOWN = 0; // 第一个定义的枚举值,必须为0
IMPOERT_TYPE_APPEND = 1; // 追加
IMPOERT_TYPE_FULLCOVER = 2; // 全量覆盖
}
使用其他消息类型
message Auth{
string organization_uuid = 1;
string project_uuid = 2;
string user_uuid = 3;
string task_uuid = 4;
}
message RDBMSSqlExportDataReq{
Auth auth = 1;//权限相关
string source_uri = 2; //来源数据源
string sql = 3; //sql语句
}
完整的 .proto 文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.dynamic_masker";
package dynamic_masker;
service DynamicMasker{
// 动态脱敏接口
rpc DataMasker(DataMaskerReq) returns(DataMaskerResp) {}
// 数据识别接口
rpc DataIdentity(DataIdentityReq) returns(DataIdentityResp) {}
}
message DataMaskerReq{
repeated FieldData field_data_list = 1;
}
message FieldData{
string field_name = 1; // 字段名称
string masker_algorithm = 2; // 脱敏类型
repeated FieldValues field_values_list = 3; // 脱敏数据列表
}
message FieldValues {
string field_value = 1;
}
message DataMaskerResp{
int32 code = 1; // 返回状态码 0 成功 -1失败
string msg = 2; // 失败时,返回错误信息
repeated MaskedData masked_data_list = 4; // 返回脱敏后的数据
}
message MaskedResult{
int32 total_count = 1; // 脱敏数据总量
int32 failed_count = 2; // 脱敏失败数量
}
message MaskedData{
string field_name = 1; //字段名称
repeated FieldValues field_values_list = 2; // 脱敏后的数据列表
MaskedResult masked_result = 3; // 每个字段脱敏数量
}
message DataIdentityReq{
repeated DataIdentityData field_data_list = 1;
}
message DataIdentityResp{
int32 code = 1; // 返回状态码 0 成功 -1失败
string msg = 2; // 失败时,返回错误信息
repeated IdentityHitData field_hit_result = 3;
}
message IdentityHitData{
string field_name = 1;
string hit = 2;
string missed_hit = 3;
}
message DataIdentityData{
string field_name = 1; // 字段名称
string data_type = 2; // 脱敏类型
repeated FieldValues field_values_list = 3; // 脱敏数据列表
}
三、编译 .proto
文件
编译 .proto
文件时,编译器将生成所选语言的代码
编译命令
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. dynamic_masker.proto
- python -m 把库模块当做脚本运行
- -I 指定 .proto 文件所在路径,proto 文件目录
- --python_out=. 编译
.proto
文件所生成的文件路径,这里生成到当前目录 - --grpc_python_out=. 编译生成处理 grpc 相关的代码的路径,这里生成到当前目录
编译后生成的文件
- dynamic_masker_pb2.py 用来和 protobuf 数据进行交互
- dynamic_masker_pb2_grpc.py 用来和 grpc 进行交互
四、客户端、服务端代码
服务端 server.py
from grpc_lib import dynamic_masker_pb2
from concurrent import futures
from grpc_lib import dynamic_masker_pb2_grpc
from dm_utils.dm_core import masker_text, data_identity
from dm_utils.dm_algorithm import DMAlg
import grpc
import time
import os
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class DynamicMasker(dynamic_masker_pb2_grpc.DynamicMaskerServicer):
def DataMasker(self, request, context):
algorithm = DMAlg()
masked_data_list = []
for field_data in request.field_data_list:
masked_data = {}
field_values_list = field_data.field_values_list
masker_algorithm = field_data.masker_algorithm
masked_data['field_name'] = field_data.field_name
masked_data['field_values_list'] = []
total_count = len(field_values_list)
if masker_algorithm == 'shuffle':
# 调用shuffle功能
masked_data['field_values_list'] = algorithm.shuffle(list(field_values_list))
masked_data['masked_result'] = {'total_count': total_count, 'failed_count': total_count}
masked_data_list.append(masked_data)
else:
failed_count = 0
for item in field_values_list:
try:
masked_value = masker_text(item.field_value, masker_algorithm)
except Exception as e:
return dynamic_masker_pb2.DataMaskerResp(
code=-1,
msg=str(e),
masked_data_list=masked_data_list
)
if not masked_value:
failed_count += 1
else:
masked_data['field_values_list'].append({'field_value': masked_value})
masked_data['masked_result'] = {'total_count': total_count, 'failed_count': failed_count}
masked_data_list.append(masked_data)
return dynamic_masker_pb2.DataMaskerResp(
code=0,
msg='success',
masked_data_list=masked_data_list
)
def DataIdentity(self, request, context):
field_hit_result = []
for field_data in request.field_data_list:
field_name = field_data.field_name
data_type = field_data.data_type
field_values_list = field_data.field_values_list
identity_data_result = {}
hit = 0
missed_hit = 0
for item in field_values_list:
try:
result = data_identity(item.field_value, data_type)
except Exception as e:
return dynamic_masker_pb2.DataMaskerResp(
code=-1,
msg=str(e),
masked_data_list=field_hit_result
)
if not result:
missed_hit += 1
else:
hit += 1
identity_data_result['field_name'] = field_name
identity_data_result['hit'] = str(hit)
identity_data_result['missed_hit'] = str(missed_hit)
field_hit_result.append(identity_data_result)
return dynamic_masker_pb2.DataIdentityResp(
code=0,
msg='success',
field_hit_result=field_hit_result
)
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=int(os.environ['MAX_WORKERS'])))
dynamic_masker_pb2_grpc.add_DynamicMaskerServicer_to_server(DynamicMasker(), server)
server.add_insecure_port('0.0.0.0:8000')
server.start()
print("sever is opening ,waiting for message...")
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
客户端 client.py
from grpc_lib import dynamic_masker_pb2_grpc, dynamic_masker_pb2
import grpc
def run():
channel = grpc.insecure_channel('127.0.0.1:31003')
stub = dynamic_masker_pb2_grpc.DynamicMaskerStub(channel)
response = stub.DataMasker(dynamic_masker_pb2.DataMaskerReq(
field_data_list=[
{
'field_name': 'id',
'masker_algorithm': 'mask',
'field_values_list': [
{'field_value': '1'},
{'field_value': '2'}
]
},
{
'field_name': 'name',
'masker_algorithm': 'mask',
'field_values_list': [
{'field_value': '1'},
{'field_value': '2'},
{'field_value': '3'}
]
}
]
))
print("Client received: ", response)
if __name__ == '__main__':
run()