zoukankan      html  css  js  c++  java
  • Hadoop IPC的代码结构分析

    与IPC相关的代码在org.apache.hadoop.ipc包下。共七个文件,其中4个辅助类:

    RemoteException

    Status

    VersionedProtocol

    ConnectionHeader

    主要实现类3个:

    Client

    Server

    RPC

    客户端Client:

    image

    如上图:

    与IPC连接相关的

    • Client.Connection
    • Client.ConnectionId
    • ConnectionHeader

    与远程调用Call相关的

    • Client.Call
    • Client.ParallelCall
    • Client.ParallelResults

    服务器端Server:

    image

    与IPC连接相关的

    • Server.Connection
    • ConnectionHeader

    与远程调用Call相关的

    • Server.Call
    • Server.Responder
    • Server.Listener
    • Server.Handler

    RPC

    RPC是在Server及Client的基础上实现了Hadoop IPC。

    image

    与客户端相关的功能:

    • RPC.ClientCache
    • RPC.Invoker(继承java.lang.reflect.InvocationHandler)
    • RPC.Invocation

    与服务端相关的功能:

    • RPC.Server

    Connection

    客户端与服务器端对连接的抽象不一样,所以有Server.Connection和Client.Connection。Hadoop远程调用采用TCP协议通信。

    1)客户端Client.ConnectionId

    连接复用:当多个IPC客户端的ConnectionId相同时,他们共享一个IPC连接。连接复用可以减少Hadoop Server、Client的资源占用,同时节省IPC连接时间。

    2)ConnectionHeader

    Server与Client间TCP连接建立后交换的第一条信息,包含ConnectionId.ticket(UserGroupInformation)用户信息和IPC接口信息,检验是否实现了IPC接口,以及该用户是否有权使用接口。

    Call

    建立连接后,即可以进行远程过程调用服务,即对IPC接口方法的调用,源码抽象为Call。

    远程调用Client.Call对象和Server.Call对象,是一个IPC调用产生的,存在于IPC客户端(存根)和IPC服务端(骨架)中的实体。

    Client.Call对象通过IPC连接到服务器后,自然会构成相应的Server.Call对象。

    Client.Call何时产生以及如何产生?

    image

    如上图所示流程:

    1. 用户发起远程接口调用
    2. 动态代理,RPC.Invoker调用句柄捕获远程调用
    3. 根据invoke的输入参数method、args生成RPC.Invocation对象
    4. 并调用Client.call,call会将上一步的Invocation对象序列化并通过IPC连接发送到服务器。Client.call会等待服务端返回的结果。
    5. 服务器端Listener监听Client发来的连接请求和数据请求,并调用Server端的连接对象。
    6. 连接对象接收远程调用请求帧,反序列化,并将请求放于阻塞队列中,由Handler处理。
    7. Handler调用对应的IPC接口实现类,完成过程调用,将结果序列化。
    8. 如果此时连接的应答队列为空,返回给客户端。
    9. 否则,客户端比较忙,应答队列不为空,Handler将结果放入响应队列,由Responser通过IPC发送给客户端。

    Client.Connection对象,需要通过setupIOstreams方法和服务器建立连接,该方法首先通过Java套接字与server建立Socket连接,如果失败,则进行一定次数的重试,如下代码,是setupIOstreams调用的setupConnection:

    private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
          this.socket = socketFactory.createSocket();
          this.socket.setTcpNoDelay(tcpNoDelay);
          //禁用tcp的Nagle算法,关闭socket底层缓冲
          // 配置项 ${ipc.client.tcpnodelay}
          
          /*
           * Bind the socket to the host specified in the principal name of the
           * client, to ensure Server matching address of the client connection
           * to host name in principal passed.
           */
          if (UserGroupInformation.isSecurityEnabled()) {
            KerberosInfo krbInfo = 
              remoteId.getProtocol().getAnnotation(KerberosInfo.class);
            if (krbInfo != null && krbInfo.clientPrincipal() != null) {
              String host = 
                SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());
              
              // If host name is a valid local address then bind socket to it
              InetAddress localAddr = NetUtils.getLocalInetAddress(host);
              if (localAddr != null) {
                this.socket.bind(new InetSocketAddress(localAddr, 0));
              }
            }
          }
          
          // connection time out is 20s
          NetUtils.connect(this.socket, server, 20000);
          if (rpcTimeout > 0) {
            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
          }
    
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* Check for an address change and update the local reference.
           * Reset the failure counter if the address was changed
           */
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          /* The max number of retries is 45,
           * which amounts to 20s*45 = 15 minutes retries.
           */
          handleConnectionFailure(timeoutFailures++, 45, toe);
        } catch (IOException ie) {
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          handleConnectionFailure(ioFailures++, maxRetries, ie);
        }
      }
    }

    IPC连接

    • 连接建立
    • 连接上的数据读写
    • 连接维护
    • 连接关闭

    在接下来的几篇内分别介绍以上内容。

  • 相关阅读:
    一句话解释数字签名。一句话解释数字证书
    KVC、KVO实现过程
    SSH基本概念和用途
    UICollectionView左对齐流水布局、右对齐流水布局
    Git命令学习笔记
    Xcode8插件安装
    Xcode日常使用
    dispatch_group_t 日常使用注意事项
    二分查找变种
    maven插件调试方法
  • 原文地址:https://www.cnblogs.com/dorothychai/p/4180738.html
Copyright © 2011-2022 走看看