需求与背景
需求背景
政府网约车监管平台,为了对网约车行业进行实时监控,顺利完成对网约车行业信用的考核管理。要求网约车平台向交通部级平台实时传输相关数据。同时数据顺利传输,网约车平台是提交网约车牌照审批的必要条件。
按照交通部规定,网约车仅需要完成【全国性网约车平台】-【部级网约车监管平台】数据交换。但是实际运行过程中,由于各城市要求不一致,网约车平台同时需要对接相关城市(下图红色线条部分数据)。
下图是网约车各级平台直接数据交换过程。
存在问题
1、不同城市对数据需求不同,包括
接口类型,示例,广州需要驾驶员个人合同详细信息。 接口重复度90%左右
同样接口中字段不一致:示例,驾驶员统计接口,广州特殊要求交通部事故次数,其他城市并没有。特殊字段不一致
接口相同字段数据内容:交通部要求文件sftp上传至文件中心,广州要求上传文件base64编码
数据字段属性:示例,性别有些城市要求数字,有些是汉字(男、女)
2、不同城市网络环境不同,交通部使用VPN,南京使用专线,上海、广州等使用公网 ;
针对上面问题,解决方案:不同城市根据个性化情况,独立开发接口,部署项目。增加调度项目dispatch分配不同城市消息数据(北京除外)
整体框架情况如下图所示,内部项目之间通过mafka进行消息数据传输
目前方案存在问题
1、不同类型数据,同时进入一个topic消费。数据区分度小,相同环境,出现问题相互影响。
2、随着不同城市接入,调度系统出现瓶颈
3、不同城市接口相似度90%,新对接一个城市,都需要不断重复复制代码,组装城市接口。该过程耗时,代码重用性小,项目中存在类膨胀的问题
4、缺少数据查询、异常数据预防,过滤,数据预警、数据重发功能。导致数据过程黑盒化。
措施
基于需求差异以及方案存在问题、进行平台化改造,本期主要是数据处理过程改造。整体框架
补充说明
逻辑架构图
重构主要内容
1、消息主题重构,根据数据类型不同划分司机、订单、乘客等相关消息类型。 消息主题和消息类型--2019改版后
2、城市内容组装过程重构,province-core中完成城市数据防腐、接口数据组装过程、异常消息存储重试等核心过程。并且提供接口配置模板、常用的数据请求过程、例如http请求
3、城市发送过程重构。根据模板配置接口、提供个性化需求(特殊数据过滤)
UML图
MsgFilter
DataManageService
DataManageService
CityRepository
TemplateMethod
CityClient
City.properties
1、系统核心内容均在province-core,所用城市公用该部分代码。主要功能包括,消息初级防腐、消息组装、消息发送的公共方法、以及日志、消息异常处理等
2、城市jar主要实现:城市接口配置、消息mafka配置、接口请求的特殊配置(http url、 http header等)、特殊模板方法
数据库设计(DML SQL)
hive表设计
1、hive表用途:记录接收消息、接口发送接口。数据用于支持数据统计;实时数据查询ES
2、所有城市通用一张表
3、不包含LBS数据,LBS数据可以logcenter中查询,数据保留4天
qcs-regulation-jiaotongbu | ||
---|---|---|
log_type |
string |
1、api 接口发送结果 2、message 消息接收和消费结果 |
city_id |
int |
城市id,消费消息的城市,例如发送到的交通部的数据 city_id=-99 |
msg_city_id |
int |
消息城市id,示例南京的订单信息 msg_city_id=55 |
msg_type |
string |
原始消息类型 |
msg_time |
string |
消息时间 格式:yyyy-MM-dd HH:mm:ss |
c_time |
string |
写入时间 格式:yyyy-MM-dd HH:mm:ss |
message |
string |
原始消息 json串 |
request_body |
string |
接口请求实体 json串 示例:{"alias":"operateArrive","body":{"DestTime":20190409113512,"DriveTime":162,"CompanyId":"3100SHLTNA99","DestLongitude":118805336,"OrderId":"111545602453977916319970000055","DestLatitude":32100502,"Encrypt":1,"DriveMile":0.845},"fieldMap":{"DestTime":20190409113512,"DriveTime":162,"IPCType":"operateArrive","CompanyId":"3100SHLTNA99","DestLongitude":118805336,"OrderId":"111545602453977916319970000055","URL":"/operate/arrive","DestLatitude":32100502,"Encrypt":1,"DriveMile":0.845},"name":"营运到达","retryCount":0} |
api_msg_type |
string |
接口消息类型 接口唯一标记 对应request_body中的【alias】 示例:baseInfoDriver、orderCancle |
api_name |
string |
接口名称 中文标记 log_type=message 时为空 示例:车辆基本信息、订单创建等 |
order_id |
string |
订单id |
driver_id |
string |
司机id |
vehicle_no |
string |
车牌 |
license_id |
string |
驾驶员号 |
drive_mile |
string |
载客里程 |
fact_price |
string |
运价 |
flag |
string |
操作标识 1:新增 2:更新 3:删除 |
status |
String |
log_type=message时 status=receive、success、fail log_type=api时 status=success、fail |
发送rabbit消息结构
数据
政府数据对接中数据处理流程,可以更好的帮助理解管理平台需要保存处理的数据源。清晰记录数据发送的过程。主要保存数据
1、第1步,接收到的原始消息messageDTO【status=receive、type=message、unionId=msgId】
2、第6、7步,向政府接口发送的数据、以及发送结果cityRequest【status=success/fail、type=api、unionId=msgId】
组装request数据
基于上面的问题,需要保存数据包括
-
原始消息数据【检索字段、原始消息体MessageDTO】
-
向政府接口发送的接口数据【请求结果、请求体 cityRequest】
-
政府对接---->rabbit管理平台发送的消息格式 原始消息
-
代码块
{ "msgCityId": "消息属于城市,类型long", "cityId": "消息处理城市,类型long", "timestamp": "消息时间戳", "msgType": "消息类型", "originalMsgType": "数据组提供的消息类型", "type": "message", "data": "数据组提供的messageDTO中data数据", "source": "原始消息 type=message时是接收MQ的原始消息(messageDTO)" "traceId":"消息唯一标记" }
-
api消息格式
-
代码块
{ "cityId": "消息处理城市,类型long", "timestamp": "消息时间戳", "msgCityId": "消息属于城市,类型long", "msgType": "消息类型", "originalMsgType": "数据组提供的消息类型", "type": "api", ”apiName“:”接口名称,type=message 为空“, ”apiMsgType“:”接口名称标记,示例baseInfoDriver“, "result": "success 接口请求成功,fail 接口请求失败 type=message 为空", "cause":"接口请求失败原因,result=fail时", "data": "数据组提供的messageDTO中data数据", "source": "type=api时是发送给政府的原始消息(cityRequest)" "traceId":"消息唯一标记" }