之前项目为了自动化,所以写一个protobuf的解释器,用来生成项目所需的格式。
当然现在通过以下链接的指导,跳过手工分析,直接生成代码了。
https://developers.google.com/protocol-buffers/docs/reference/cpp-generated
这次文档主要是描述如何分析protobuf格式,以及如何收集需要的符号。
使用python 2.7脚本进行文本的处理。
程序分成4个模块:
expression: 格式的解析
symbol:在protobuf中定义的message等对象以及它们的层次结构,在这里已经看不见protobuf的样子了。
typecollection:基础类型定义和收集message等对象。
builder:遍历symbol,根据需要创建适合的输出文件。typecollection起到索引的作用。这次就不演示了。
1 测试用protobuf文件。(来源于google示例)
package tutorial; message Person { required string name = 1; required int32 id = 2 ; optional string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; }
message PhoneNumber { required string number = 1; optional PhoneType type = 2 [default = HOME]; } repeated PhoneNumber phone = 4; } message AddressBook { repeated Person person = 1; }
2 expression实现---最简单的扫描方法,分析每一个word。
# -*- coding: UTF-8 -*-
# pb_expression.py import sys import os import string import shutil import io import pb_symbol class StringBuffer(object): def __init__(self,src): self.src = src; pass;
def __del__(self): self.buf = None; pass; def OpenFile(self): self.Data = open(self.src).read() pass; class Expression(object): desc_set = set(['required','optional','repeated']) b_char_set = set(['A','B','C','D','E' ,'F','G','H','I','J' ,'K','L','M','N','O' ,'P','Q','R','S','T' ,'U','V','W','X','Y','Z']) l_char_set = set (['a','b','c','d','e' ,'f','g','h','i','j' ,'k','l','m','n','o' ,'p','q','r','s','t' ,'u','v','w','x','y','z']) digit_set = set([0,1,2,3,4,5,6,7,8,9]) equals_char = '=' space_char = ' ' openbrace_char = '{' closebrace_char = '}' semicolon_char = ';' tab_char = chr(9) newline_char = chr(10) return_char = chr(13) slash_char = chr(47) ctl_char_set = set([openbrace_char,closebrace_char,semicolon_char,equals_char,' ',' ',' ','=',';',space_char]) empty_char_set = set ([space_char,tab_char,newline_char,return_char]) symbol_char_set = b_char_set | l_char_set | digit_set all_char_set = symbol_char_set | ctl_char_set def backup(self): return self.index; def restore(self,prevIndex): self.index = prevIndex;
def forwardChar(self): if(self.index < self.count): self.index = self.index +1 def backChar(self): if(self.index > 0): self.index = self.index -1 def getchar(self): if( self.index < self.count): char = self.Buf.Data[self.index] self.forwardChar() return char return None; def skipComment(self): bkIndex = self.backup(); while 1: char = self.getchar() next_char = self.getchar() if(char != self.slash_char or next_char != self.slash_char): self.restore(bkIndex) return; while 1: char = self.getchar() if(char == None): self.restore(bkIndex) return; if(char == self.newline_char): return; def getSpecialChar(self,currentchar): while 1: self.skipComment() char = self.getchar(); if(char == None): break; else: if(char == currentchar): break; return char; def getVisibleChar(self): while 1: self.skipComment() char = self.getchar(); if(char is None): break; else: if(char not in self.empty_char_set): break; return char; def getNextword(self): word = None got1st = 0 while 1: self.skipComment() char = self.getchar() if(char == None): break; if(got1st == 0): if(char not in self.ctl_char_set): word = char got1st = 1 else: if(char in self.ctl_char_set): self.backChar() break; else: word = word + char return word; def do_enum_item(self,pbEnum): memText = self.getNextword(); self.getSpecialChar(self.equals_char); memValue = self.getNextword(); self.getSpecialChar(self.semicolon_char); pbEnum.append_Member(memText,memValue) def do_enum_proc(self): symbol = self.getNextword(); pbEnum = pb_symbol.PBEnum(symbol) while 1: currentIndex = self.backup() word = self.getNextword(); if(word == None): break; self.restore(currentIndex) self.do_enum_item(pbEnum) end_char_Index = self.backup(); char = self.getVisibleChar(); if(char == self.closebrace_char): break; else: self.restore(end_char_Index); self.symbol.append_enum(pbEnum) def do_message_proc(self): symbol = self.getNextword(); pbMsg = pb_symbol.PBMessage(symbol) while 1: currentIndex = self.backup() word = self.getNextword(); if(word == None): break; if(word in self.token_set): subSymbol = pb_symbol.Symbol(self.symbol.tpDict,self.symbol.entity_full_path,False); subSymbol.update_namespace(symbol); self.restore(currentIndex); subExp = Expression(self.Buf,subSymbol); subExp.index = self.index; subExp.do_expression(); self.index = subExp.index self.symbol.append_symbol(subSymbol) pbMsg.enableSymbol = 1 else: if(word in self.desc_set): memType = self.getNextword(); memText = self.getNextword(); pbMsg.append_Member(word,memType,memText) self.getSpecialChar(self.semicolon_char); end_char_Index = self.backup(); char = self.getVisibleChar(); if(char == self.closebrace_char): break; else: self.restore(end_char_Index); self.symbol.append_message(pbMsg) def do_import_proc(self): self.getSpecialChar(self.semicolon_char); def do_package_proc(self): word = self.getNextword(); self.symbol.update_namespace(word) self.getSpecialChar(self.semicolon_char); token_set = { 'message':do_message_proc ,'enum':do_enum_proc ,'import':do_import_proc ,'package':do_package_proc } def do_expression(self): while 1: current_index = self.backup(); token = self.getNextword(); if(token == None): break; if(token in self.token_set): proc = self.token_set[token]; proc(self); else: self.restore(current_index) break;
def __init__(self,sBuf,symbol): self.Buf = sBuf; self.index = 0; self.count = len(self.Buf.Data) self.symbol = symbol;
3 symbol--定义对象类型以及层次
# -*- coding: UTF-8 -*- # pb_symbol.py
import os import string import pb_typecollection class PBEntity(object): def __init__(self,entName,rtname): self.entName = entName; self.orgName = entName self.rtname = rtname def outputDebug(self): pass; def create_impl(self,entity_indent,top_ns): batch_list = list(); return batch_list; def mem_include(self,entName): return False; class PBMessageMember(object): def __init__(self,option,memType,memText): self.option = option; self.memType = memType; self.memText = memText; def outputDebug(self): print(self.option,self.memType,self.memText) @property def mem_option(self): return self.option @property def mem_type(self): return self.memType; @property def mem_text(self): return self.memText class PBMessage(PBEntity): def __init__(self,entName): PBEntity.__init__(self,entName, entName ); self.members = [] self.enableSymbol = 0; self.rt_ns = ''; self.tpDict = None @property def Members(self): return self.members def attach_tp_dict(self,tpDict): self.tpDict = tpDict; def append_Member(self,option,memType,memText): msgMem = PBMessageMember(option,memType,memText) self.members.append(msgMem) def enable_Symbol(self,enable): self.enableSymbol = enable; def outputDebug(self,ns): print(ns,'message',self.entName); for entMsg in self.members: entMsg.outputDebug(); print(''); def attach_tp_dict(self,tpDict): self.tpDict = tpDict; def set_rt_ns(self,rt_entity_full_path): self.rt_ns = rt_entity_full_path def mem_include(self,entName): for entMsg in self.members: if(entName == entMsg.memType): return True; return False; def detect_request(self): if(self.members.count > 0 ): return True; return False; class PBEnumMember(object): def __init__(self,memText,memValue): self.memText = memText; self.memValue = memValue; def outputDebug(self): print(self.memText,self.memValue) class PBEnum( PBEntity): def __init__(self,entName): PBEntity.__init__(self,entName,entName); self.members = [] def append_Member(self,memText,memValue): msgMem = PBEnumMember(memText,memValue) self.members.append(msgMem) def outputDebug(self,ns): print(ns,'enum',self.entName); for entEnum in self.members: entEnum.outputDebug(); print(''); class Symbol(object): def __init__(self,tpDict,fullpath,rooted): self.namespace = '' self.tpDict = tpDict self.rooted = rooted self.entity_full_path = fullpath self.rt_entity_full_path = fullpath self.entitylist = [] self.containerlist = [] def __del__(self): pass; def update_namespace(self,namespace): self.namespace = namespace; if(self.rooted == False): if(self.entity_full_path == ''): self.entity_full_path = namespace self.rt_entity_full_path = namespace else: self.entity_full_path = '%s_%s' %(self.entity_full_path,namespace) self.rt_entity_full_path = '%s_%s' %(self.entity_full_path,namespace) def append_type_dict(self,entity,isMsg): if(isMsg == True): if(self.entity_full_path == ''): self.tpDict.insert_type(entity.entName ,entity.rtname ,entity ,'') else: self.tpDict.insert_type(entity.entName ,'%s::%s' % (self.rt_entity_full_path, entity.rtname) ,entity ,'') else: if(self.entity_full_path == ''): self.tpDict.insert_type(entity.entName ,entity.rtname ,entity ,entity.rtname) else: self.tpDict.insert_type(entity.entName ,'%s::%s' % (self.rt_entity_full_path, entity.rtname) ,entity ,'%s::%s' % (self.entity_full_path, entity.rtname)) def append_message(self,msg): self.entitylist.append(msg) self.containerlist.append(msg) msg.attach_tp_dict(self.tpDict); if(self.rt_entity_full_path == ''): msg.set_rt_ns(self.rt_entity_full_path) else: msg.set_rt_ns(self.rt_entity_full_path + '_') self.append_type_dict(msg,True) def append_enum(self,enum): self.entitylist.append(enum) self.append_type_dict(enum,False) def append_symbol(self,symbol): self.entitylist.append(symbol) self.containerlist.append(symbol) def outputDebug(self,ns): for entity in self.entitylist: entity.outputDebug(ns +'::'+self.namespace); def query_entitylist(self): return self.entitylist; def query_containerlist(self): return self.containerlist; def query_pb_ns(self): return self.namespace; def mem_include(self,entName): for entity in self.entitylist: if(entity.mem_include(entName) == True): return True; return False; class PBProxy(object): def __init__(self,entity): self.entity = entity @property def enableSymbol(self): return self.entity.enableSymbol def mem_include(self,entName): return self.entity.mem_include(entName) def create_impl(self,entity_indent,top_ns): return self.entity.create_impl(entity_indent,top_ns) @property def entName(self): return self.entity.entName; @property def rtname(self): return self.entity.rtname; @property def orgName(self): return self.entity.orgName; @property def members(self): return self.entity.members; @property def rt_ns(self): return self.entity.rt_ns; @property def namespace(self): return self.entity.namespace; @property def rooted(self): return self.entity.rooted; @property def entity_full_path(self): return self.entity.entity_full_path; @property def rt_entity_full_path(self): return self.entity.rt_entity_full_path; @property def entitylist(self): return self.entity.entitylist @property def containerlist(self): return self.entity.containerlist @property def tpDict(self): return self.entity.tpDict; def detect_request(self): return self.entity.detect_request() @property def Members(self): return self.entity.members @property def mem_option(self): return self.entity.mem_option @property def mem_type(self): return self.entity.mem_type; @property def mem_text(self): return self.entity.mem_text
4 typecollection
# -*- coding: UTF-8 -*- # pb_typecollection.py import os import pb_symbol class typeDict(object): op_req_desc = 'required' op_opt_desc = 'optional' op_rep_desc = 'repeated' def __init__(self): self.collection = dict() self.insert_type('int32','__int32',pb_symbol.PBEntity('int32','int32'),'') self.insert_type('int64','__int64',pb_symbol.PBEntity('int64','int64'),'') self.insert_type('uint32','unsigned int',pb_symbol.PBEntity('uint32','uint32'),'') self.insert_type('bool','bool',pb_symbol.PBEntity('bool','bool'),'') self.insert_type('float','float',pb_symbol.PBEntity('float','float'),'') self.insert_type('double','double',pb_symbol.PBEntity('double','double'),'') self.insert_type('string','const char*',pb_symbol.PBEntity('string','string'),'') self.insert_type('bytes','const char*',pb_symbol.PBEntity('bytes','bytes'),'') def insert_type(self, entName, rtType,entity,orgType): self.collection[entName] = (rtType,entity,orgType); def output_debug(self): print('type collection') for item in self.collection.items(): print(item);
5 测试脚本
# -*- coding: UTF-8 -*- import pb_symbol import pb_expression import pb_typecollection if __name__ == '__main__': pb_file = 'google_tutorial.proto' sBuf = pb_expression.StringBuffer(pb_file); tpDict = pb_typecollection.typeDict() symbol = pb_symbol.Symbol(tpDict,'',True); try: sBuf.OpenFile(); exp = pb_expression.Expression(sBuf,symbol); exp.do_expression(); symbol.outputDebug(''); tpDict.output_debug(); except Exception as exc: print("%s",exc); print("done");
6 输出
命名空间:::tutorial::Person
类型名称:PhoneType
('::tutorial::Person', 'enum', 'PhoneType')
('MOBILE', '0')
('HOME', '1')
('WORK', '2')
('::tutorial::Person', 'message', 'PhoneNumber')
('required', 'string', 'number')
('optional', 'PhoneType', 'type')
('::tutorial', 'message', 'Person')
('required', 'string', 'name')
('required', 'int32', 'id')
('optional', 'string', 'email')
('repeated', 'PhoneNumber', 'phone')
('::tutorial', 'message', 'AddressBook')
('repeated', 'Person', 'person')
type collection
('PhoneNumber', ('Person::PhoneNumber', <pb_symbol.PBMessage object at 0x02B9DED0>, ''))
('int32', ('__int32', <pb_symbol.PBEntity object at 0x02BE3F70>, ''))
('string', ('const char*', <pb_symbol.PBEntity object at 0x02BEE0F0>, ''))
('double', ('double', <pb_symbol.PBEntity object at 0x02BEE0B0>, ''))
('float', ('float', <pb_symbol.PBEntity object at 0x02BEE070>, ''))
('bytes', ('const char*', <pb_symbol.PBEntity object at 0x02BEE130>, ''))
('Person', ('Person', <pb_symbol.PBMessage object at 0x02BEE210>, ''))
('bool', ('bool', <pb_symbol.PBEntity object at 0x02BEE050>, ''))
('PhoneType', ('Person::PhoneType', <pb_symbol.PBEnum object at 0x02BEE450>, 'Person::PhoneType'))
('int64', ('__int64', <pb_symbol.PBEntity object at 0x02BE3FB0>, ''))
('uint32', ('unsigned int', <pb_symbol.PBEntity object at 0x02BE3FF0>, ''))
('AddressBook', ('AddressBook', <pb_symbol.PBMessage object at 0x02BEE7B0>, ''))
参考
protobuf的git地址:https://github.com/google/protobuf