from concurrent import futures import time import grpc from example import helloworld_pb2_grpc, helloworld_pb2 # 实现 proto 文件中定义的 GreeterServicer class Greeter(helloworld_pb2_grpc.GreeterServicer): # 实现 proto 文件中定义的 rpc 调用 def SayHello(self, request, context): return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def SayHelloAgain(self, request, context): return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def serve(): # 启动 rpc 服务 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(60 * 60 * 24) # one day in seconds except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
1️⃣ 创建 server
这里我们传了一个线程池给 grpc 的 server ,这个线程池用来处理请求。
经过重重调用,最后我们得到的 server 是 _Server 的实例
class _Server(grpc.Server):
# pylint: disable=too-many-arguments
def __init__(self, thread_pool, generic_handlers, interceptors, options,
maximum_concurrent_rpcs, compression):
completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_augment_options(options, compression))
server.register_completion_queue(completion_queue)
self._state = _ServerState(completion_queue, server, generic_handlers,
_interceptor.service_pipeline(interceptors),
thread_pool, maximum_concurrent_rpcs)
def add_generic_rpc_handlers(self, generic_rpc_handlers):
_validate_generic_rpc_handlers(generic_rpc_handlers)
_add_generic_handlers(self._state, generic_rpc_handlers)
def add_insecure_port(self, address):
return _common.validate_port_binding_result(
address, _add_insecure_port(self._state, _common.encode(address)))
def add_secure_port(self, address, server_credentials):
return _common.validate_port_binding_result(
address,
_add_secure_port(self._state, _common.encode(address),
server_credentials))
def start(self):
_start(self._state)
def wait_for_termination(self, timeout=None):
# NOTE(https://bugs.python.org/issue35935)
# Remove this workaround once threading.Event.wait() is working with
# CTRL+C across platforms.
return _common.wait(self._state.termination_event.wait,
self._state.termination_event.is_set,
timeout=timeout)
def stop(self, grace):
return _stop(self._state, grace)
def __del__(self):
if hasattr(self, '_state'):
# We can not grab a lock in __del__(), so set a flag to signal the
# serving daemon thread (if it exists) to initiate shutdown.
self._state.server_deallocated = True
cygrpc.CompletionQueue 和 cygrpc.Server 都是调用底层的 c++ core ,我们不去管它。
再来看看这个 _ServerState 的代码
class _ServerState(object):
# pylint: disable=too-many-arguments
def __init__(self, completion_queue, server, generic_handlers,
interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
self.lock = threading.RLock()
self.completion_queue = completion_queue
self.server = server
self.generic_handlers = list(generic_handlers)
self.interceptor_pipeline = interceptor_pipeline
self.thread_pool = thread_pool
self.stage = _ServerStage.STOPPED
self.termination_event = threading.Event()
self.shutdown_events = [self.termination_event]
self.maximum_concurrent_rpcs = maximum_concurrent_rpcs
self.active_rpc_count = 0
# TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
self.rpc_states = set()
self.due = set()
# A "volatile" flag to interrupt the daemon serving thread
self.server_deallocated = False
从这里我们可以看到,python 的 server 只是对底层的简单封装,关于网络IO的处理完全是底层的 c++ core 负责,python 主要负责调用开发者的接口处理请求。
2️⃣ 注册接口方法
这步负责将我们开发好的接口注册到服务器上,调用的是编译 proto 文件生成的 _pb2_grpc 后缀文件的函数。
def add_GreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
'SayHello': grpc.unary_unary_rpc_method_handler(
servicer.SayHello,#接口调用
request_deserializer=helloworld__pb2.HelloRequest.FromString,#反系列方法
response_serializer=helloworld__pb2.HelloReply.SerializeToString, #系列方法
),
'SayHelloAgain': grpc.unary_unary_rpc_method_handler(
servicer.SayHelloAgain,
request_deserializer=helloworld__pb2.HelloRequest.FromString,
response_serializer=helloworld__pb2.HelloReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'Greeter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
请求的路由分发使用的是字典,key 是我们定义的接口名,value 则是一个命名元组,里面保存的我们的接口方法、序列化方法和反序列化。
3️⃣ 绑定监听端口
这个最后是调用 c++ core 的代码,直接忽略
4️⃣ 服务启动
server 的 start 方法只是调用 _start 函数
def start(self):
_start(self._state)
def _start(state):
with state.lock:
if state.stage is not _ServerStage.STOPPED:
raise ValueError('Cannot start already-started server!')
state.server.start()
state.stage = _ServerStage.STARTED
_request_call(state)
thread = threading.Thread(target=_serve, args=(state,))
thread.daemon = True
thread.start()