zoukankan      html  css  js  c++  java
  • python实现: protobuf解释器

    之前项目为了自动化,所以写一个protobuf的解释器,用来生成项目所需的格式。

    当然现在通过以下链接的指导,跳过手工分析,直接生成代码了。

    https://developers.google.com/protocol-buffers/docs/reference/cpp-generated

    这次文档主要是描述如何分析protobuf格式,以及如何收集需要的符号。

    使用python 2.7脚本进行文本的处理。

    程序分成4个模块:

    expression: 格式的解析

    symbol:在protobuf中定义的message等对象以及它们的层次结构,在这里已经看不见protobuf的样子了。

    typecollection:基础类型定义和收集message等对象。

    builder:遍历symbol,根据需要创建适合的输出文件。typecollection起到索引的作用。这次就不演示了。

    1 测试用protobuf文件。(来源于google示例)

    package tutorial;
    
    message Person {
      required string name = 1;
      required int32 id = 2 ;
      optional string email = 3;
    
      enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
      }
    message PhoneNumber { required string number = 1; optional PhoneType type = 2 [default = HOME]; } repeated PhoneNumber phone = 4; } message AddressBook { repeated Person person = 1; }

     2 expression实现---最简单的扫描方法,分析每一个word。 

    # -*- coding: UTF-8 -*-
    # pb_expression.py import sys import os import string import shutil import io import pb_symbol class StringBuffer(object): def __init__(self,src): self.src = src; pass;
    def __del__(self): self.buf = None; pass; def OpenFile(self): self.Data = open(self.src).read() pass; class Expression(object): desc_set = set(['required','optional','repeated']) b_char_set = set(['A','B','C','D','E' ,'F','G','H','I','J' ,'K','L','M','N','O' ,'P','Q','R','S','T' ,'U','V','W','X','Y','Z']) l_char_set = set (['a','b','c','d','e' ,'f','g','h','i','j' ,'k','l','m','n','o' ,'p','q','r','s','t' ,'u','v','w','x','y','z']) digit_set = set([0,1,2,3,4,5,6,7,8,9]) equals_char = '=' space_char = ' ' openbrace_char = '{' closebrace_char = '}' semicolon_char = ';' tab_char = chr(9) newline_char = chr(10) return_char = chr(13) slash_char = chr(47) ctl_char_set = set([openbrace_char,closebrace_char,semicolon_char,equals_char,' ',' ',' ','=',';',space_char]) empty_char_set = set ([space_char,tab_char,newline_char,return_char]) symbol_char_set = b_char_set | l_char_set | digit_set all_char_set = symbol_char_set | ctl_char_set def backup(self): return self.index; def restore(self,prevIndex): self.index = prevIndex;
    def forwardChar(self): if(self.index < self.count): self.index = self.index +1 def backChar(self): if(self.index > 0): self.index = self.index -1 def getchar(self): if( self.index < self.count): char = self.Buf.Data[self.index] self.forwardChar() return char return None; def skipComment(self): bkIndex = self.backup(); while 1: char = self.getchar() next_char = self.getchar() if(char != self.slash_char or next_char != self.slash_char): self.restore(bkIndex) return; while 1: char = self.getchar() if(char == None): self.restore(bkIndex) return; if(char == self.newline_char): return; def getSpecialChar(self,currentchar): while 1: self.skipComment() char = self.getchar(); if(char == None): break; else: if(char == currentchar): break; return char; def getVisibleChar(self): while 1: self.skipComment() char = self.getchar(); if(char is None): break; else: if(char not in self.empty_char_set): break; return char; def getNextword(self): word = None got1st = 0 while 1: self.skipComment() char = self.getchar() if(char == None): break; if(got1st == 0): if(char not in self.ctl_char_set): word = char got1st = 1 else: if(char in self.ctl_char_set): self.backChar() break; else: word = word + char return word; def do_enum_item(self,pbEnum): memText = self.getNextword(); self.getSpecialChar(self.equals_char); memValue = self.getNextword(); self.getSpecialChar(self.semicolon_char); pbEnum.append_Member(memText,memValue) def do_enum_proc(self): symbol = self.getNextword(); pbEnum = pb_symbol.PBEnum(symbol) while 1: currentIndex = self.backup() word = self.getNextword(); if(word == None): break; self.restore(currentIndex) self.do_enum_item(pbEnum) end_char_Index = self.backup(); char = self.getVisibleChar(); if(char == self.closebrace_char): break; else: self.restore(end_char_Index); self.symbol.append_enum(pbEnum) def do_message_proc(self): symbol = self.getNextword(); pbMsg = pb_symbol.PBMessage(symbol) while 1: currentIndex = self.backup() word = self.getNextword(); if(word == None): break; if(word in self.token_set): subSymbol = pb_symbol.Symbol(self.symbol.tpDict,self.symbol.entity_full_path,False); subSymbol.update_namespace(symbol); self.restore(currentIndex); subExp = Expression(self.Buf,subSymbol); subExp.index = self.index; subExp.do_expression(); self.index = subExp.index self.symbol.append_symbol(subSymbol) pbMsg.enableSymbol = 1 else: if(word in self.desc_set): memType = self.getNextword(); memText = self.getNextword(); pbMsg.append_Member(word,memType,memText) self.getSpecialChar(self.semicolon_char); end_char_Index = self.backup(); char = self.getVisibleChar(); if(char == self.closebrace_char): break; else: self.restore(end_char_Index); self.symbol.append_message(pbMsg) def do_import_proc(self): self.getSpecialChar(self.semicolon_char); def do_package_proc(self): word = self.getNextword(); self.symbol.update_namespace(word) self.getSpecialChar(self.semicolon_char); token_set = { 'message':do_message_proc ,'enum':do_enum_proc ,'import':do_import_proc ,'package':do_package_proc } def do_expression(self): while 1: current_index = self.backup(); token = self.getNextword(); if(token == None): break; if(token in self.token_set): proc = self.token_set[token]; proc(self); else: self.restore(current_index) break;
    def __init__(self,sBuf,symbol): self.Buf = sBuf; self.index = 0; self.count = len(self.Buf.Data) self.symbol = symbol;

    3 symbol--定义对象类型以及层次

    # -*- coding: UTF-8 -*-
    # pb_symbol.py
    import os import string import pb_typecollection class PBEntity(object): def __init__(self,entName,rtname): self.entName = entName; self.orgName = entName self.rtname = rtname def outputDebug(self): pass; def create_impl(self,entity_indent,top_ns): batch_list = list(); return batch_list; def mem_include(self,entName): return False; class PBMessageMember(object): def __init__(self,option,memType,memText): self.option = option; self.memType = memType; self.memText = memText; def outputDebug(self): print(self.option,self.memType,self.memText) @property def mem_option(self): return self.option @property def mem_type(self): return self.memType; @property def mem_text(self): return self.memText class PBMessage(PBEntity): def __init__(self,entName): PBEntity.__init__(self,entName, entName ); self.members = [] self.enableSymbol = 0; self.rt_ns = ''; self.tpDict = None @property def Members(self): return self.members def attach_tp_dict(self,tpDict): self.tpDict = tpDict; def append_Member(self,option,memType,memText): msgMem = PBMessageMember(option,memType,memText) self.members.append(msgMem) def enable_Symbol(self,enable): self.enableSymbol = enable; def outputDebug(self,ns): print(ns,'message',self.entName); for entMsg in self.members: entMsg.outputDebug(); print(''); def attach_tp_dict(self,tpDict): self.tpDict = tpDict; def set_rt_ns(self,rt_entity_full_path): self.rt_ns = rt_entity_full_path def mem_include(self,entName): for entMsg in self.members: if(entName == entMsg.memType): return True; return False; def detect_request(self): if(self.members.count > 0 ): return True; return False; class PBEnumMember(object): def __init__(self,memText,memValue): self.memText = memText; self.memValue = memValue; def outputDebug(self): print(self.memText,self.memValue) class PBEnum( PBEntity): def __init__(self,entName): PBEntity.__init__(self,entName,entName); self.members = [] def append_Member(self,memText,memValue): msgMem = PBEnumMember(memText,memValue) self.members.append(msgMem) def outputDebug(self,ns): print(ns,'enum',self.entName); for entEnum in self.members: entEnum.outputDebug(); print(''); class Symbol(object): def __init__(self,tpDict,fullpath,rooted): self.namespace = '' self.tpDict = tpDict self.rooted = rooted self.entity_full_path = fullpath self.rt_entity_full_path = fullpath self.entitylist = [] self.containerlist = [] def __del__(self): pass; def update_namespace(self,namespace): self.namespace = namespace; if(self.rooted == False): if(self.entity_full_path == ''): self.entity_full_path = namespace self.rt_entity_full_path = namespace else: self.entity_full_path = '%s_%s' %(self.entity_full_path,namespace) self.rt_entity_full_path = '%s_%s' %(self.entity_full_path,namespace) def append_type_dict(self,entity,isMsg): if(isMsg == True): if(self.entity_full_path == ''): self.tpDict.insert_type(entity.entName ,entity.rtname ,entity ,'') else: self.tpDict.insert_type(entity.entName ,'%s::%s' % (self.rt_entity_full_path, entity.rtname) ,entity ,'') else: if(self.entity_full_path == ''): self.tpDict.insert_type(entity.entName ,entity.rtname ,entity ,entity.rtname) else: self.tpDict.insert_type(entity.entName ,'%s::%s' % (self.rt_entity_full_path, entity.rtname) ,entity ,'%s::%s' % (self.entity_full_path, entity.rtname)) def append_message(self,msg): self.entitylist.append(msg) self.containerlist.append(msg) msg.attach_tp_dict(self.tpDict); if(self.rt_entity_full_path == ''): msg.set_rt_ns(self.rt_entity_full_path) else: msg.set_rt_ns(self.rt_entity_full_path + '_') self.append_type_dict(msg,True) def append_enum(self,enum): self.entitylist.append(enum) self.append_type_dict(enum,False) def append_symbol(self,symbol): self.entitylist.append(symbol) self.containerlist.append(symbol) def outputDebug(self,ns): for entity in self.entitylist: entity.outputDebug(ns +'::'+self.namespace); def query_entitylist(self): return self.entitylist; def query_containerlist(self): return self.containerlist; def query_pb_ns(self): return self.namespace; def mem_include(self,entName): for entity in self.entitylist: if(entity.mem_include(entName) == True): return True; return False; class PBProxy(object): def __init__(self,entity): self.entity = entity @property def enableSymbol(self): return self.entity.enableSymbol def mem_include(self,entName): return self.entity.mem_include(entName) def create_impl(self,entity_indent,top_ns): return self.entity.create_impl(entity_indent,top_ns) @property def entName(self): return self.entity.entName; @property def rtname(self): return self.entity.rtname; @property def orgName(self): return self.entity.orgName; @property def members(self): return self.entity.members; @property def rt_ns(self): return self.entity.rt_ns; @property def namespace(self): return self.entity.namespace; @property def rooted(self): return self.entity.rooted; @property def entity_full_path(self): return self.entity.entity_full_path; @property def rt_entity_full_path(self): return self.entity.rt_entity_full_path; @property def entitylist(self): return self.entity.entitylist @property def containerlist(self): return self.entity.containerlist @property def tpDict(self): return self.entity.tpDict; def detect_request(self): return self.entity.detect_request() @property def Members(self): return self.entity.members @property def mem_option(self): return self.entity.mem_option @property def mem_type(self): return self.entity.mem_type; @property def mem_text(self): return self.entity.mem_text

     4 typecollection

    # -*- coding: UTF-8 -*-
    # pb_typecollection.py
    
    import os
    import pb_symbol
    
    
    class typeDict(object):
         op_req_desc = 'required'
         op_opt_desc = 'optional'
         op_rep_desc = 'repeated'
         def __init__(self):
              self.collection  = dict()
              self.insert_type('int32','__int32',pb_symbol.PBEntity('int32','int32'),'')
              self.insert_type('int64','__int64',pb_symbol.PBEntity('int64','int64'),'')
              self.insert_type('uint32','unsigned int',pb_symbol.PBEntity('uint32','uint32'),'')
              self.insert_type('bool','bool',pb_symbol.PBEntity('bool','bool'),'')
              self.insert_type('float','float',pb_symbol.PBEntity('float','float'),'')
              self.insert_type('double','double',pb_symbol.PBEntity('double','double'),'')
              self.insert_type('string','const char*',pb_symbol.PBEntity('string','string'),'')
              self.insert_type('bytes','const char*',pb_symbol.PBEntity('bytes','bytes'),'')        
                   
            
            
         def insert_type(self, entName, rtType,entity,orgType):        
              self.collection[entName] = (rtType,entity,orgType);        
              
         def output_debug(self):
              print('type collection')
              for item in self.collection.items():
                   print(item);
           

     5 测试脚本

    # -*- coding: UTF-8 -*-
    
    import pb_symbol
    import pb_expression
    import pb_typecollection
    
    if __name__ == '__main__':
         
         pb_file = 'google_tutorial.proto'
         sBuf = pb_expression.StringBuffer(pb_file);     
         tpDict = pb_typecollection.typeDict()
         symbol = pb_symbol.Symbol(tpDict,'',True);
         try:
              sBuf.OpenFile();
              exp = pb_expression.Expression(sBuf,symbol);
              exp.do_expression();
              symbol.outputDebug('');    
              tpDict.output_debug();       
         except Exception as exc:     
              print("%s",exc);
         print("done");

    6 输出

    命名空间:::tutorial::Person

    类型名称:PhoneType

    ('::tutorial::Person', 'enum', 'PhoneType')   
    ('MOBILE', '0')
    ('HOME', '1')
    ('WORK', '2')

    ('::tutorial::Person', 'message', 'PhoneNumber')
    ('required', 'string', 'number')
    ('optional', 'PhoneType', 'type')

    ('::tutorial', 'message', 'Person')
    ('required', 'string', 'name')
    ('required', 'int32', 'id')
    ('optional', 'string', 'email')
    ('repeated', 'PhoneNumber', 'phone')

    ('::tutorial', 'message', 'AddressBook')
    ('repeated', 'Person', 'person')

    type collection
    ('PhoneNumber', ('Person::PhoneNumber', <pb_symbol.PBMessage object at 0x02B9DED0>, ''))
    ('int32', ('__int32', <pb_symbol.PBEntity object at 0x02BE3F70>, ''))
    ('string', ('const char*', <pb_symbol.PBEntity object at 0x02BEE0F0>, ''))
    ('double', ('double', <pb_symbol.PBEntity object at 0x02BEE0B0>, ''))
    ('float', ('float', <pb_symbol.PBEntity object at 0x02BEE070>, ''))
    ('bytes', ('const char*', <pb_symbol.PBEntity object at 0x02BEE130>, ''))
    ('Person', ('Person', <pb_symbol.PBMessage object at 0x02BEE210>, ''))
    ('bool', ('bool', <pb_symbol.PBEntity object at 0x02BEE050>, ''))
    ('PhoneType', ('Person::PhoneType', <pb_symbol.PBEnum object at 0x02BEE450>, 'Person::PhoneType'))
    ('int64', ('__int64', <pb_symbol.PBEntity object at 0x02BE3FB0>, ''))
    ('uint32', ('unsigned int', <pb_symbol.PBEntity object at 0x02BE3FF0>, ''))
    ('AddressBook', ('AddressBook', <pb_symbol.PBMessage object at 0x02BEE7B0>, ''))

    参考

    protobuf的git地址:https://github.com/google/protobuf

  • 相关阅读:
    Java 面向对象之static,final,匿名对象,内部类,包,修饰符
    用NotePad++如何实现大小写转换
    Java 面向对象之接口、多态
    Jmeter测试API接口,用Jmeter自动化之检查DB数据
    SQLServer 大小写转换
    vmstat 命令详解
    Java 面向对象之构造方法
    Java 面向对象之继承和重写OverWrite,重写和重载的区别,抽象类
    Java 集合、Iterator迭代器、泛型等
    【已解决】面试测试岗位遇到的几个未解决的问题
  • 原文地址:https://www.cnblogs.com/febwave/p/4819186.html
Copyright © 2011-2022 走看看