zoukankan      html  css  js  c++  java
  • Hadoop RPC

    Remote Procedure Call 远程方法调用。不需要了解网络细节,某一程序即可使用该协议请求来自网络内另一台及其程序的服务。它是一个 Client/Server 的结构,提供服务的一方称为Server,消费服务的一方称为Client。

    Hadoop 底层的交互都是通过 rpc 进行的。例 如:datanode 和 namenode、tasktracker 和 jobtracker、secondary namenode 和 namenode 之间的通信都是通过 rpc 实现的。

    TODO: 此文未写明了。明显需要画 4张图, rpc 原理图,Hadoop rpc 时序图, 客户端 流程图,服端流程图。最好帖几个包图+ 类图(组件图)。待完善。

    要实现远程过程调用,需要有3要素: 1、server 必须发布服务 2、在 client 和 server 两端都需要有模块来处理协议和连接 3、server 发布的服务,需要将接口给到 client

    Hadoop RPC

    1. 序列化层。 Client 与 Server 端通讯传递的信息采用实现自 Writable 类型
    2. 函数调用层。 Hadoop RPC 通过动态代理和 java 反射实现函数调用
    3. 网络传输层。Hadoop RPC 采用 TCP/IP socket 机制
    4. 服务器框架层。Hadoop RPC 采用 java NIO 事件驱动模型提高 RPC Server 吞吐量

    TODO 缺个 RPC 图

    Hadoop RPC 源代码主要在org.apache.hadoop.ipc包下。org.apache.hadoop.ipc.RPC 内部包含5个内部类。

    • Invocation :用于封装方法名和参数,作为数据传输层,相当于VO(Value Object)。
    • ClientCache :用于存储client对象,用 socket factory 作为 hash key,存储结构为 hashMap 。
    • Invoker :是动态代理中的调用实现类,继承了 java.lang.reflect.InvocationHandler。
    • Server :是ipc.Server的实现类。
    • VersionMismatch : 协议版本。

    从客户端开始进行通讯源代码分析

    org.apache.hadoop.ipc.Client 有5个内部类

    • Call: A call waiting for a value.
    • Connection: Thread that reads responses and notifies callers. Each connection owns a socket connected to a remote address. Calls are multiplexed through this socket: responses may be delivered out of order.
    • ConnectionId: This class holds the address and the user ticket. The client connections to servers are uniquely identified by
    • ParallelCall: Call implementation used for parallel calls.
    • ParallelResults: Result collector for parallel calls.

    客户端和服务端建立连接的大致执行过程为

    1. 在 Object org.apache.hadoop.ipc.RPC.Invoker.invoke(Object proxy, Method method, Object[] args) 方法中调用
      client.call(new Invocation(method, args), remoteId);

    2. 上述的 new Invocation(method, args) 是 org.apache.hadoop.ipc.RPC 的内部类,它包含被调用的方法名称及其参数。此处主要是设置方法和参数。 client 为 org.apache.hadoop.ipc.Client 的实例对象。

    3. org.apache.hadoop.ipc.Client.call() 方法的具体源代码。在call()方法中 getConnection()内部获取一个 org.apache.hadoop.ipc.Client.Connection 对象并启动 io 流 setupIOstreams()。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      Writable org.apache.hadoop.ipc.Client.call(Writable param, ConnectionId remoteId) throwsInterruptedException, IOException {
      Call call = new Call(param); //A call waiting for a value.  
      // Get a connection from the pool, or create a new one and add it to the
      // pool.  Connections to a given ConnectionId are reused.
      Connection connection = getConnection(remoteId, call);// 主要在 org.apache.hadoop.net 包下。
      connection.sendParam(call); //客户端发送数据过程
      boolean interrupted = false;
      synchronized (call) {
         while (!call.done) {
          try {
            call.wait();                           // wait for the result
          } catch (InterruptedException ie) {
            // save the fact that we were interrupted
            interrupted = true;
          }
        }
      … …
      }
      }
      // Get a connection from the pool, or create a new one and add it to the
      // pool.  Connections to a given ConnectionId are reused.
      private Connection getConnection(ConnectionId remoteId,
                                     Call call)
                                     throws IOException, InterruptedException {
      if (!running.get()) {
        // the client is stopped
        throw new IOException("The client is stopped");
      }
      Connection connection;
      // we could avoid this allocation for each RPC by having a 
      // connectionsId object and with set() method. We need to manage the
      // refs for keys in HashMap properly. For now its ok.
      do {
        synchronized (connections) {
          connection = connections.get(remoteId);
          if (connection == null) {
            connection = new Connection(remoteId);
            connections.put(remoteId, connection);
          }
        }
      } while (!connection.addCall(call));
      //we don't invoke the method below inside "synchronized (connections)"
      //block above. The reason for that is if the server happens to be slow,
      //it will take longer to establish a connection and that will slow the
      //entire system down.
      connection.setupIOstreams(); // 向服务段发送一个 header 并等待结果
      return connection;
      }
    4. setupIOstreams() 方法。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      void org.apache.hadoop.ipc.Client.Connection.setupIOstreams() throws InterruptedException {
      // Connect to the server and set up the I/O streams. It then sends
      // a header to the server and starts
      // the connection thread that waits for responses.
      while (true) {
            setupConnection();//  建立连接
            InputStream inStream = NetUtils.getInputStream(socket); // 输入
            OutputStream outStream = NetUtils.getOutputStream(socket); // 输出
            writeRpcHeader(outStream);
            }
      … …
      // update last activity time
        touch();
      // start the receiver thread after the socket connection has been set up            start();
      }       
    5. 启动org.apache.hadoop.ipc.Client.Connection 客户端获取服务器端放回数据过程

      1
      2
      3
      4
      void org.apache.hadoop.ipc.Client.Connection.run()
      while (waitForWork()) {//wait here for work - read or close connection
          receiveResponse();
        }

    ipc.Server源码分析

    ipc.Server 有6个内部类:

    • Call :用于存储客户端发来的请求
    • Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
    • ExceptionsHandler: 异常管理
    • Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
    • Connection :连接类,真正的客户端请求读取逻辑在这个类中。
    • Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

    大致过程为:

    1. Namenode的初始化时,RPC的server对象是通过ipc.RPC类的getServer()方法获得的。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      void org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(Configuration conf) throwsIOException
      // create rpc server
      InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
      if (dnSocketAddr != null) {
        int serviceHandlerCount =
          conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                      DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
        this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
            dnSocketAddr.getPort(), serviceHandlerCount,
            false, conf, namesystem.getDelegationTokenSecretManager());
        this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
        setRpcServiceServerAddress(conf);
      }
      … …
      this.server.start();  //start RPC server 
    2. 启动 server

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      void org.apache.hadoop.ipc.Server.start()
      // Starts the service.  Must be called before any calls will be handled.
      public synchronized void start() {
      responder.start();
      listener.start();
      handlers = new Handler[handlerCount];
      for (int i = 0; i < handlerCount; i++) {
        handlers[i] = new Handler(i);
        handlers[i].start(); //处理call
      }
      }
    3. Server处理请求, server 同样使用非阻塞 nio 以提高吞吐量

      1
      2
      3
      4
      5
      6
      7
      org.apache.hadoop.ipc.Server.Listener.Listener(Server) throws IOException
      public Listener() throws IOException {
        address = new InetSocketAddress(bindAddress, port);
        // Create a new server socket and set to non blocking mode
        acceptChannel = ServerSocketChannel.open();
        acceptChannel.configureBlocking(false);
      … … }    
    4. 真正建立连接

      1
      void org.apache.hadoop.ipc.Server.Listener.doAccept(SelectionKey key) throws IOException,OutOfMemoryError

      Reader 读数据接收请求

      1
      2
      3
      4
      5
      6
      7
      void org.apache.hadoop.ipc.Server.Listener.doRead(SelectionKey key) throws InterruptedException
      try {
          count = c.readAndProcess();
        } catch (InterruptedException ieo) {
          LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
          throw ieo;
        }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      int org.apache.hadoop.ipc.Server.Connection.readAndProcess() throws IOException,InterruptedException
      if (!rpcHeaderRead) {
            //Every connection is expected to send the header.
            if (rpcHeaderBuffer == null) {
              rpcHeaderBuffer = ByteBuffer.allocate(2);
            }
            count = channelRead(channel, rpcHeaderBuffer);
            if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
              return count;
            }
            int version = rpcHeaderBuffer.get(0);
      … …
      processOneRpc(data.array()); // 数据处理
    5. 下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      void org.apache.hadoop.ipc.Server.Connection.processOneRpc(byte[] buf) throws IOException,InterruptedException
      private void processOneRpc(byte[] buf) throws IOException,
          InterruptedException {
        if (headerRead) {
          processData(buf);
        } else {
          processHeader(buf);
          headerRead = true;
          if (!authorizeConnection()) {
            throw new AccessControlException("Connection from " + this
                + " for protocol " + header.getProtocol()
                + " is unauthorized for user " + user);
          }
        }
      }
    6. 处理call

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      void org.apache.hadoop.ipc.Server.Handler.run()
      while (running) {
          try {
            final Call call = callQueue.take(); // pop the queue; maybe blocked here
            … …
            CurCall.set(call);
            try {
              // Make the call as the user via Subject.doAs, thus associating
              // the call with the Subject
              if (call.connection.user == null) {
                value = call(call.connection.protocol, call.param,
                             call.timestamp);
              } else {
      … …}
    7. 返回请求

    下面贴出Server.Responder类中的doRespond()方法源码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    void org.apache.hadoop.ipc.Server.Responder.doRespond(Call call) throws IOException
        //
        // Enqueue a response from the application.
        //
        void doRespond(Call call) throws IOException {
          synchronized (call.connection.responseQueue) {
            call.connection.responseQueue.addLast(call);
            if (call.connection.responseQueue.size() == 1) {
              processResponse(call.connection.responseQueue, true);
            }
          }
        }

    补充: notify()让因wait()进入阻塞队列里的线程(blocked状态)变为runnable,然后发出notify()动作的线程继续执行完,待其完成后,进行调度时,调用wait()的线程可能会被再次调度而进入running状态。

  • 相关阅读:
    第一章
    第一章 计算机系统漫游
    hihocoder #1014 : Trie树
    第一章
    来个小目标
    poj 1056 IMMEDIATE DECODABILITY
    poj 2001 Shortest Prefixes
    __name__ 指示模块应如何被加载
    Python 常用函数time.strftime()简介
    CentOS安装beEF做XSS平台
  • 原文地址:https://www.cnblogs.com/mrcharles/p/11879840.html
Copyright © 2011-2022 走看看