from concurrent import futures import time import grpc from example import helloworld_pb2_grpc, helloworld_pb2 # 实现 proto 文件中定义的 GreeterServicer class Greeter(helloworld_pb2_grpc.GreeterServicer): # 实现 proto 文件中定义的 rpc 调用 def SayHello(self, request, context): return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def SayHelloAgain(self, request, context): return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def serve(): # 启动 rpc 服务 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(60 * 60 * 24) # one day in seconds except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
1️⃣ 创建 server
这里我们传了一个线程池给 grpc 的 server ,这个线程池用来处理请求。
经过重重调用,最后我们得到的 server 是 _Server
的实例
class _Server(grpc.Server): # pylint: disable=too-many-arguments def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): completion_queue = cygrpc.CompletionQueue() server = cygrpc.Server(_augment_options(options, compression)) server.register_completion_queue(completion_queue) self._state = _ServerState(completion_queue, server, generic_handlers, _interceptor.service_pipeline(interceptors), thread_pool, maximum_concurrent_rpcs) def add_generic_rpc_handlers(self, generic_rpc_handlers): _validate_generic_rpc_handlers(generic_rpc_handlers) _add_generic_handlers(self._state, generic_rpc_handlers) def add_insecure_port(self, address): return _common.validate_port_binding_result( address, _add_insecure_port(self._state, _common.encode(address))) def add_secure_port(self, address, server_credentials): return _common.validate_port_binding_result( address, _add_secure_port(self._state, _common.encode(address), server_credentials)) def start(self): _start(self._state) def wait_for_termination(self, timeout=None): # NOTE(https://bugs.python.org/issue35935) # Remove this workaround once threading.Event.wait() is working with # CTRL+C across platforms. return _common.wait(self._state.termination_event.wait, self._state.termination_event.is_set, timeout=timeout) def stop(self, grace): return _stop(self._state, grace) def __del__(self): if hasattr(self, '_state'): # We can not grab a lock in __del__(), so set a flag to signal the # serving daemon thread (if it exists) to initiate shutdown. self._state.server_deallocated = True
cygrpc.CompletionQueue
和 cygrpc.Server
都是调用底层的 c++ core
,我们不去管它。
再来看看这个 _ServerState
的代码
class _ServerState(object): # pylint: disable=too-many-arguments def __init__(self, completion_queue, server, generic_handlers, interceptor_pipeline, thread_pool, maximum_concurrent_rpcs): self.lock = threading.RLock() self.completion_queue = completion_queue self.server = server self.generic_handlers = list(generic_handlers) self.interceptor_pipeline = interceptor_pipeline self.thread_pool = thread_pool self.stage = _ServerStage.STOPPED self.termination_event = threading.Event() self.shutdown_events = [self.termination_event] self.maximum_concurrent_rpcs = maximum_concurrent_rpcs self.active_rpc_count = 0 # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields. self.rpc_states = set() self.due = set() # A "volatile" flag to interrupt the daemon serving thread self.server_deallocated = False
从这里我们可以看到,python 的 server
只是对底层的简单封装,关于网络IO的处理完全是底层的 c++ core
负责,python 主要负责调用开发者的接口处理请求。
2️⃣ 注册接口方法
这步负责将我们开发好的接口注册到服务器上,调用的是编译 proto 文件生成的 _pb2_grpc
后缀文件的函数。
def add_GreeterServicer_to_server(servicer, server): rpc_method_handlers = { 'SayHello': grpc.unary_unary_rpc_method_handler( servicer.SayHello,#接口调用 request_deserializer=helloworld__pb2.HelloRequest.FromString,#反系列方法 response_serializer=helloworld__pb2.HelloReply.SerializeToString, #系列方法 ), 'SayHelloAgain': grpc.unary_unary_rpc_method_handler( servicer.SayHelloAgain, request_deserializer=helloworld__pb2.HelloRequest.FromString, response_serializer=helloworld__pb2.HelloReply.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'Greeter', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,))
请求的路由分发使用的是字典,key 是我们定义的接口名,value 则是一个命名元组,里面保存的我们的接口方法、序列化方法和反序列化。
3️⃣ 绑定监听端口
这个最后是调用 c++ core
的代码,直接忽略
4️⃣ 服务启动
server
的 start
方法只是调用 _start
函数
def start(self): _start(self._state)
def _start(state): with state.lock: if state.stage is not _ServerStage.STOPPED: raise ValueError('Cannot start already-started server!') state.server.start() state.stage = _ServerStage.STARTED _request_call(state) thread = threading.Thread(target=_serve, args=(state,)) thread.daemon = True thread.start()
![](https://img2020.cnblogs.com/blog/90107/202012/90107-20201221180330579-678774715.png)