zoukankan      html  css  js  c++  java
  • grpc学习

    安装环境

    • 要求
      • Python 2.7, or Python 3.4 or higher
      • pip version 9.0.1 or higher
    • 安装grpcio模块
    python -m pip install grpcio
    • 安装
     pip install grpcio-tools

    下载例子

    grpc-tools使用

    python -m grpc_tools.protoc -I=[proto所在文件目录]--python_out=[输出目录] --grpc_python_out=[输出目录] [.proto文件]

    grpc使用python

    • 定义.proto文件
    //最好使用proto3
    syntax = "proto3";
    //定义一个服务
    service RouteGuide {
        //声明一个方法
       rpc GetFeature(MyMsg) returns (Ret) {}
    }
    message MyMsg {
      string name = 1;
      int32 id = 2;
    }
    message Ret{
      string name = 1;
    }
    • 生成_pb2和_pb2_grpc
    python -m grpc_tools.protoc -I=[proto所在文件目录]--python_out=[输出目录] --grpc_python_out=[输出目录] [.proto文件]
    • 创建服务
    # 实现 proto 文件中定义的 RouteGuideServicer
    class RouteGuideServicer(sever_pb2_grpc.RouteGuideServicer):
        # 实现 proto 文件中定义的 rpc 调用
        def GetFeature(self, request, context):
            return sever_pb2.Ret(name="sssss")

    在_pb2_grpc文件里面可以看到要实现的地方

    实现启动服务

    def serve():
        # 启动 rpc 服务
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        sever_pb2_grpc.add_RouteGuideServicer_to_server(RouteGuideServicer(), server)
        server.add_insecure_port('[::]:50051')
        server.start()
        try:
            while True:
                time.sleep(60*60*24) # one day in seconds
        except KeyboardInterrupt:
            server.stop(0)
    
    if __name__ == '__main__':
        serve()
    • 创建客户端
      首先创建一个Creating a stub
      channel = grpc.insecure_channel('localhost:50051')
        stub = sever_pb2_grpc.RouteGuideStub(channel)

    通过stub调用方法

     response = stub.GetFeature(sever_pb2.MyMsg(name='czl',id=1))

    所有实现的代码

    import grpc
    import sever_pb2
    import sever_pb2_grpc
    def run():
        channel = grpc.insecure_channel('localhost:50051')
        stub = sever_pb2_grpc.RouteGuideStub(channel)
        response = stub.GetFeature(sever_pb2.MyMsg(name='czl',id=1))
        print (response)
    if __name__ == '__main__':
        run()

    grpc的保活机制

    可以注册回调函数
    channel.subscribe(self.callback)
    下面是官方对于这个函数的解释,大致的意识就是传递一个回调函数,当channel状态改变的时候会调用这个回调函数Channel Connectivity是他的状态
    subscribe

    abstract subscribe(callback, try_to_connect=False)[source]¶
    Subscribe to this Channel’s connectivity state machine.
    
    A Channel may be in any of the states described by ChannelConnectivity. This method allows application to monitor the state transitions. The typical use case is to debug or gain better visibility into gRPC runtime’s state.
    
    Parameters
    callback – A callable to be invoked with ChannelConnectivity argument. ChannelConnectivity describes current state of the channel. The callable will be invoked immediately upon subscription and again for every change to ChannelConnectivity until it is unsubscribed or this Channel object goes out of scope.
    
    try_to_connect – A boolean indicating whether or not this Channel should attempt to connect immediately. If set to False, gRPC runtime decides when to connect.

    Channel Connectivity

    Channel Connectivity
    class grpc.ChannelConnectivity[source]
    Mirrors grpc_connectivity_state in the gRPC Core.
    
    IDLE
    The channel is idle.
    
    CONNECTING
    The channel is connecting.
    
    READY
    The channel is ready to conduct RPCs.
    
    TRANSIENT_FAILURE
    The channel has seen a failure from which it expects to recover.
    
    SHUTDOWN
    The channel has seen a failure from which it cannot recover.

    回调函数编写例子

        channel.subscribe(callback)
        def callback(self, *args, **kwargs):
            """
            回调
            :return:
            """
            state = args[0]
            if self.callback_handler:
                with lock:
                    return self.callback_handler.dispatch(self, state)
    from grpc import ChannelConnectivity
    class DefaultCallBackHandler(object):
        def __init__(self, channel):
            self.channel = channel
    
        def dispatch(self, channel, state):
            if state == ChannelConnectivity.TRANSIENT_FAILURE:
                return self.transient_failure(channel)
            elif state == ChannelConnectivity.SHUTDOWN:
                return self.shut_down(channel)
            elif state == ChannelConnectivity.CONNECTING:
                return self.connecting(channel)
            elif state == ChannelConnectivity.READY:
                return self.ready(channel)
            elif state == ChannelConnectivity.IDLE:
                return self.idle(channel)
    
        def shut_down(self, channel):
            channel.state = "SHUTDOWN"
            channel.reconnect()
            print("[%s] server error with shutdown" % channel.connect_id)
    
        def connecting(self, channel):
            channel.state = "CONNECTING"
            print("[%s] I am trying to connect server" % channel.connect_id)
    
        def ready(self, channel):
            channel.state = "READY"
            print("[%s] I am ready to send a request" % channel.connect_id)
    
        def transient_failure(self, channel):
            channel.state = "TRANSIENT_FAILURE"
            channel.reconnect()
            print("[%s] someting wrong with this channel" % channel.connect_id)
    
        def idle(self, channel):
            channel.state = "IDLE"
            print("[%s] waiting" % channel.connect_id)
  • 相关阅读:
    【高并发】面试官问我:为什么局部变量是线程安全的?
    Java中的String到底占用多大的内存空间?你所了解的可能都是错误的!!
    【高并发】学好并发编程,关键是要理解这三个核心问题
    【高并发】高并发分布式锁架构解密,不是所有的锁都是分布式锁!!
    12张图带你彻底理解分布式事务产生的场景和解决方案!!
    【高并发】面试官:讲讲高并发场景下如何优化加锁方式?
    【高并发】秒杀系统架构解密,不是所有的秒杀都是秒杀(升级版)!!
    十一长假我肝了这本超硬核PDF,现决定开源!!
    一文搞懂PV、UV、VV、IP及其关系与计算
    我把这个贼好用的Excel导出工具开源了!!
  • 原文地址:https://www.cnblogs.com/fangaojun/p/13100537.html
Copyright © 2011-2022 走看看