zoukankan      html  css  js  c++  java
  • gRPC源码分析2-Server的建立

    gRPC中,Server、Client共享的Class不是很多,所以我们可以单独的分别讲解Server和Client的源码。

    通过第一篇,我们知道对于gRPC来说,建立Server是非常简单的,还记得怎么写的?还是以example里 HelloWorldServer 例子来看

    server = ServerBuilder.forPort(port)
    .addService(new GreeterImpl())
    .build()
    .start();

    你没有看错,就是这么几行搞定。

    如果需要看懂gRPC的源码,首先有几点需要明白

    • Builder模式生成Entity

    • Provider(SPI)模式解耦,动态选择服务提供方

    • abstract class用于扩展

    0. 流程图

    1. Builder

    ServerBuilder是一个抽象类,不同的服务提供方(Provider),将继承实现它。如何找到这些继承者呢?ServerProvider就是用来找到不同的provider的。

    2. Provider

    如上图,ServerProvider也是一个抽象类,实现者都有哪些呢?我们通过SPI模式找到他们。

    通过搜索文件知道gRPC中 io.grpc.ServerProvider 的实现方只有:Netty

    io.grpc.netty.NettyServerProvider,
    这个类就是ServerProvider的实现者,它的builderForPort返回ServerBuilder

    3. NettyServer

    最后,我们来看下当链接建立时是如何创建handle的。

    public void initChannel(Channel ch) throws Exception {
      eventLoopReferenceCounter.retain();
      ch.closeFuture().addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
          eventLoopReferenceCounter.release();
        }
      });
      NettyServerTransport transport = new NettyServerTransport(ch, protocolNegotiator,
          maxStreamsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize);
      ServerTransportListener transportListener;
      // This is to order callbacks on the listener, not to guard access to channel.
      synchronized (NettyServer.this) {
        if (channel != null && !channel.isOpen()) {
          // Server already shutdown.
          ch.close();
          return;
        }
    
        transportListener = listener.transportCreated(transport);
      }
      transport.start(transportListener);
    }

    看code可知,当一个链接建立时,会生成一个NettyServerTransport,所有的数据处理都将在这里实现。

    4. NettyServerTransport

    public void start(ServerTransportListener listener) {
      Preconditions.checkState(this.listener == null, "Handler already registered");
      this.listener = listener;
    
      // Create the Netty handler for the pipeline.
      final NettyServerHandler grpcHandler = createHandler(listener);
      HandlerSettings.setAutoWindow(grpcHandler);
    
      // Notify when the channel closes.
      channel.closeFuture().addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          notifyTerminated(grpcHandler.connectionError());
        }
      });
    
      ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
      channel.pipeline().addLast(negotiationHandler);
    }

    我们看到当调用start方法是,最重要的就是createHandle,在这个方法里将看到如何绑定HTTP/2的处理器的。

    5. NettyServerHandle

    static NettyServerHandler newHandler(ServerTransportListener transportListener,
                                         int maxStreams,
                                         int flowControlWindow,
                                         int maxHeaderListSize,
                                         int maxMessageSize) {
      Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
      // 就是一个log
      Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
      Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
      // reader
      Http2FrameReader frameReader = new Http2InboundFrameLogger(
          new DefaultHttp2FrameReader(headersDecoder), frameLogger);
      // writer
      Http2FrameWriter frameWriter =
          new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
      return newHandler(frameReader, frameWriter, transportListener, maxStreams, flowControlWindow,
          maxMessageSize);
    }
    
    @VisibleForTesting
    static NettyServerHandler newHandler(Http2FrameReader frameReader, Http2FrameWriter frameWriter,
                                         ServerTransportListener transportListener,
                                         int maxStreams,
                                         int flowControlWindow,
                                         int maxMessageSize) {
      Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive");
      Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
      Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive");
    // 一个channel一个connection
      Http2Connection connection = new DefaultHttp2Connection(true);
    
      // Create the local flow controller configured to auto-refill the connection window.
      connection.local().flowController(
          new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
    
    
      Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
      Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
          frameReader);
    
      Http2Settings settings = new Http2Settings();
      settings.initialWindowSize(flowControlWindow);
      settings.maxConcurrentStreams(maxStreams);
    
      return new NettyServerHandler(transportListener, decoder, encoder, settings, maxMessageSize);
    }

  • 相关阅读:
    POJ2888 Magic Bracelet [矩阵快速幂+Burnside+欧拉函数]
    数列的 GCD [计数问题]
    com组件的注册
    WCF 传输和接受大数据
    数据库中已存在名为 'View_Business' 的对象。
    windows 两个用户,默认其中一个用户登录
    用C#读取,写入ini文件
    小心得,关于串口
    未能加载文件或程序集"Microsoft.Web.Infrastructure, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad
    无法激活服务,因为它不支持 ASP.NET 兼容性
  • 原文地址:https://www.cnblogs.com/parse-code/p/6197992.html
Copyright © 2011-2022 走看看