zoukankan      html  css  js  c++  java
  • Hadoop RPC源码阅读-服务端Server

    Hadoop版本Hadoop2.6

    RPC主要分为3个部分:(1)交互协议 (2)客户端(3)服务端

    (3)服务端

    RPC服务端的实例代码:

    public class Starter {
        public static void main(String[] args) throws IOException {
            RPC.Builder build = new RPC.Builder(new Configuration());
            build.setBindAddress("localhost").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl());
            RPC.Server server = build.build();
            server.start();
        }
    }

    RPC 服务端主要通过NIO来处理客户端发来的请求。

    RPC服务端涉及的类主要有

    org.apache.hadoop.ipc.Server(抽象类,server的最顶层类,与客户端的链接,响应,数据传输)

    org.apache.hadoop.ipc.RPC.Server(RPC的内部类,是一个抽象类,主要涉及将请求交给动态代理)

    org.apache.hadoop.ipc.WritableRpcEngine.Server(WritableRpcEngine的内部类,是一个静态类,动态代理的具体实现,得到请求结果)

    先看ipc.Server类,它有几个主要的内部类分别是:

    Call :用于存储客户端发来的请求

    Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。

    Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。

    Connection :连接类,真正的客户端请求读取逻辑在这个类中。

    Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

    (1)启动服务

    如上述服务端实例代码所示,当调用函数start(),RPC服务端就启动起来了,我们先看看start()里面有什么

      /** Starts the service.  Must be called before any calls will be handled. */
    //该实现在ipc.Server类中
      public synchronized void start() {
        responder.start();
        listener.start();
        handlers = new Handler[handlerCount];
        
        for (int i = 0; i < handlerCount; i++) {
          handlers[i] = new Handler(i);
          handlers[i].start();
        }
      }

    可以看出,Server端通过启动Listener监听客户端发来的请求,启动responder响应客户端发来的请求,启动多个线程Handler循环阻塞读取请求交给responder响应。

    先看看如何监听客户端发来的请求

    (2)监听客户端的连接

    Listner初始化,构造函数如下所示,通过Java NIO创建一个ServerSocketChannel监听本地地址和端口,并设置成非阻塞模式,并注册成接听客户端连接事件,并启动多个Reader线程,同时将读客户端请求数据的事件交给Reader实现。

     public Listener() throws IOException {
          address = new InetSocketAddress(bindAddress, port);
          // Create a new server socket and set to non blocking mode
          acceptChannel = ServerSocketChannel.open();
          acceptChannel.configureBlocking(false);
    
          // Bind the server socket to the local host and port
          bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
          port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
          // create a selector;
          selector= Selector.open();
          readers = new Reader[readThreads];
          for (int i = 0; i < readThreads; i++) {
            Reader reader = new Reader(
                "Socket Reader #" + (i + 1) + " for port " + port);
            readers[i] = reader;
            reader.start();
          }
    
          // Register accepts on the server socket with the selector.
          acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
          this.setName("IPC Server listener on " + port);
          this.setDaemon(true);
        }

    Listener:Listener是一个Thread类的子类,通过start()启动该线程,并运行该线程的run()方法:

    public void run() {
          LOG.info(Thread.currentThread().getName() + ": starting");
          SERVER.set(Server.this);
          connectionManager.startIdleScan();
          while (running) {
            SelectionKey key = null;
            try {
              getSelector().select();
              Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();//获取连接事件
              while (iter.hasNext()) {//遍历连接事件
                key = iter.next();
                iter.remove();
                try {
                  if (key.isValid()) {
                    if (key.isAcceptable())//如果该事件是连接事件
                      doAccept(key);
                  }
                } catch (IOException e) {
                }
                key = null;
              }
            } catch (OutOfMemoryError e) {
              // we can run out of memory if we have too many threads
              // log the event and sleep for a minute and give 
              // some thread(s) a chance to finish
              LOG.warn("Out of Memory in server select", e);
              closeCurrentConnection(key, e);
              connectionManager.closeIdle(true);
              try { Thread.sleep(60000); } catch (Exception ie) {}
            } catch (Exception e) {
              closeCurrentConnection(key, e);
            }
          }
          LOG.info("Stopping " + Thread.currentThread().getName());
    
          synchronized (this) {
            try {
              acceptChannel.close();
              selector.close();
            } catch (IOException e) { }
    
            selector= null;
            acceptChannel= null;
            
            // close all connections
            connectionManager.stopIdleScan();
            connectionManager.closeAll();
          }
        }

    //对连接事件进行处理
    void
    doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); channel.socket().setKeepAlive(true); Reader reader = getReader(); Connection c = connectionManager.register(channel);//把与客户端连接的管道封装成Connection key.attach(c); // so closeCurrentConnection can get the object reader.addConnection(c);//将客户端请求连接的通道加入到Reader,监听用户发来的请求。 } }
    
    

    Reader:Reader也是Thread类的子类,通过start()启动该线程,并运行该线程的run()方法:

        public void run() {
            LOG.info("Starting " + Thread.currentThread().getName());
            try {
              doRunLoop();
            } finally {
              try {
                readSelector.close();
              } catch (IOException ioe) {
                LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
              }
            }
          }
    
          private synchronized void doRunLoop() {
            while (running) {
              SelectionKey key = null;
              try {
                // consume as many connections as currently queued to avoid
                // unbridled acceptance of connections that starves the select
                int size = pendingConnections.size();//在Listner类中通过reader.addConnection(c)加入到该阻塞队列中
                for (int i=size; i>0; i--) {//遍历处理用户的连接事件,并监听用户发来的请求
                  Connection conn = pendingConnections.take();
                  conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
                }
                readSelector.select();
    
                Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                while (iter.hasNext()) {
                  key = iter.next();
                  iter.remove();
                  if (key.isValid()) {
                    if (key.isReadable()) {//判断该事件是不是读事件,并处理该读事件
                      doRead(key);
                    }
                  }
                  key = null;
                }
              } catch (InterruptedException e) {
                if (running) {                      // unexpected -- log it
                  LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
                }
              } catch (IOException ex) {
                LOG.error("Error in Reader", ex);
              }
            }
          }
    void doRead(SelectionKey key) throws InterruptedException {
          int count = 0;
          Connection c = (Connection)key.attachment();//因为Connection也保存着与客户端的连接,因此这里提取了Connection,把处理细节交给Connection
          if (c == null) {
            return;  
          }
          c.setLastContact(Time.now());
          
          try {
            count = c.readAndProcess();
          } catch (InterruptedException ieo) {
            LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
            throw ieo;
          } catch (Exception e) {
            // a WrappedRpcServerException is an exception that has been sent
            // to the client, so the stacktrace is unnecessary; any other
            // exceptions are unexpected internal server errors and thus the
            // stacktrace should be logged
            LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " +
                c.getHostAddress() + " threw exception [" + e + "]",
                (e instanceof WrappedRpcServerException) ? null : e);
            count = -1; //so that the (count < 0) block is executed
          }
          if (count < 0) {
            closeConnection(c);
            c = null;
          }
          else {
            c.setLastContact(Time.now());
          }
        } 
    Connection类中通过channelRead(channel, data)读取客户端发送的数据,并将读取的数据通过processOneRpc(data.array())方法处理逻辑过程,
    processOneRpc通过调用processRpcRequest(RpcRequestHeaderProto header,DataInputStream dis)方法,
    在processRpcRequest方法中封装成Call数据对象,并加入callQueue.put(call)队列中。

    (3)实现客户端的请求的服务,得到客户端请求的结果数据

    Handler类实现客户端请求的服务,通过从callQueue队列获取客户端RPC请求Call对象,并调用抽象方法call处理请求。

    抽象方法call的实现在WritableRpcEngine类的内部类Server类,Server类是继承RPC.Server,是RPC服务调用的具体实现并定义相关协议

    下面是call方法的具体实现:

    public Writable call(org.apache.hadoop.ipc.RPC.Server server,
              String protocolName, Writable rpcRequest, long receivedTime)
              throws IOException, RPC.VersionMismatch {
    
            Invocation call = (Invocation)rpcRequest;
            if (server.verbose) log("Call: " + call);
    
            // Verify writable rpc version
            if (call.getRpcVersion() != writableRpcVersion) {
              // Client is using a different version of WritableRpc
              throw new RpcServerException(
                  "WritableRpc version mismatch, client side version="
                      + call.getRpcVersion() + ", server side version="
                      + writableRpcVersion);
            }
    
            long clientVersion = call.getProtocolVersion();
            final String protoName;
            ProtoClassProtoImpl protocolImpl;
            if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
              // VersionProtocol methods are often used by client to figure out
              // which version of protocol to use.
              //
              // Versioned protocol methods should go the protocolName protocol
              // rather than the declaring class of the method since the
              // the declaring class is VersionedProtocol which is not 
              // registered directly.
              // Send the call to the highest  protocol version
              VerProtocolImpl highest = server.getHighestSupportedProtocol(
                  RPC.RpcKind.RPC_WRITABLE, protocolName);
              if (highest == null) {
                throw new RpcServerException("Unknown protocol: " + protocolName);
              }
              protocolImpl = highest.protocolTarget;
            } else {
              protoName = call.declaringClassProtocolName;
    
              // Find the right impl for the protocol based on client version.
              ProtoNameVer pv = 
                  new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
              protocolImpl = 
                  server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
              if (protocolImpl == null) { // no match for Protocol AND Version
                 VerProtocolImpl highest = 
                     server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, 
                         protoName);
                if (highest == null) {
                  throw new RpcServerException("Unknown protocol: " + protoName);
                } else { // protocol supported but not the version that client wants
                  throw new RPC.VersionMismatch(protoName, clientVersion,
                    highest.version);
                }
              }
            }
              
    
              // Invoke the protocol method
           long startTime = Time.now();
           int qTime = (int) (startTime-receivedTime);
           Exception exception = null;
           try {
              Method method =
                  protocolImpl.protocolClass.getMethod(call.getMethodName(),
                  call.getParameterClasses());
              method.setAccessible(true);
              server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
              Object value = 
                  method.invoke(protocolImpl.protocolImpl, call.getParameters());
              if (server.verbose) log("Return: "+value);
              return new ObjectWritable(method.getReturnType(), value);
    
            } catch (InvocationTargetException e) {
              Throwable target = e.getTargetException();
              if (target instanceof IOException) {
                exception = (IOException)target;
                throw (IOException)target;
              } else {
                IOException ioe = new IOException(target.toString());
                ioe.setStackTrace(target.getStackTrace());
                exception = ioe;
                throw ioe;
              }
            } catch (Throwable e) {
              if (!(e instanceof IOException)) {
                LOG.error("Unexpected throwable object ", e);
              }
              IOException ioe = new IOException(e.toString());
              ioe.setStackTrace(e.getStackTrace());
              exception = ioe;
              throw ioe;
            } finally {
             int processingTime = (int) (Time.now() - startTime);
             if (LOG.isDebugEnabled()) {
               String msg = "Served: " + call.getMethodName() +
                   " queueTime= " + qTime +
                   " procesingTime= " + processingTime;
               if (exception != null) {
                 msg += " exception= " + exception.getClass().getSimpleName();
               }
               LOG.debug(msg);
             }
             String detailedMetricsName = (exception == null) ?
                 call.getMethodName() :
                 exception.getClass().getSimpleName();
             server.rpcMetrics.addRpcQueueTime(qTime);
             server.rpcMetrics.addRpcProcessingTime(processingTime);
             server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
                 processingTime);
           }
          }
    View Code

    通过调用RPC服务得到客户端请求的结果value,使用Responder类将结果返回给客户端。下面是Handler类run()方法中部分代码

          CurCall.set(null);//处理完call请求,故将当前处理call表示标志为Null
              synchronized (call.connection.responseQueue) {
                // setupResponse() needs to be sync'ed together with 
                // responder.doResponse() since setupResponse may use
                // SASL to encrypt response data and SASL enforces
                // its own message ordering.
                setupResponse(buf, call, returnStatus, detailedErr, 
                    value, errorClass, error);//将返回值封装到输出流中
                
                // Discard the large buf and reset it back to smaller size 
                // to free up heap
                if (buf.size() > maxRespSize) {
                  LOG.warn("Large response size " + buf.size() + " for call "
                      + call.toString());
                  buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
                }
                responder.doRespond(call);//将返回请求call加入队列中,然后在Responder类中一一处理。
    }

    (4)返回客户端的请求结果

    Responder类:负责返回客户端请求的结果,通过NIO注册OP_WRITE写事件,将结果返回给客户端。

    每处理完一个请求,就会调用Responder中doRespond方法处理请求结果。

        void doRespond(Call call) throws IOException {
          synchronized (call.connection.responseQueue) {
            call.connection.responseQueue.addLast(call);
            if (call.connection.responseQueue.size() == 1) {
              processResponse(call.connection.responseQueue, true);
            }
          }
        }

    可以看到上述方法又调用processResponse方法处理请求结果,这里将请求结果对象call注册OP_WRITE写事件,通过NIO返回给客户端;

    channel.register(writeSelector, SelectionKey.OP_WRITE, call);

    而Responder的run方法和doRunLoop方法检测OP_WRITE写事件,并通过doAsyncWrite方法处理该事件

    public void run() {
          LOG.info(Thread.currentThread().getName() + ": starting");
          SERVER.set(Server.this);
          try {
            doRunLoop();
          } finally {
            LOG.info("Stopping " + Thread.currentThread().getName());
            try {
              writeSelector.close();
            } catch (IOException ioe) {
              LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);
            }
          }
        }
        
        private void doRunLoop() {
          long lastPurgeTime = 0;   // last check for old calls.
    
          while (running) {
            try {
              waitPending();     // If a channel is being registered, wait.
              writeSelector.select(PURGE_INTERVAL);
              Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                try {
                  if (key.isValid() && key.isWritable()) {
                      doAsyncWrite(key);
                  }
                } catch (IOException e) {
                  LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
                }
              }
              long now = Time.now();
              if (now < lastPurgeTime + PURGE_INTERVAL) {
                continue;
              }
              lastPurgeTime = now;
              //
              // If there were some calls that have not been sent out for a
              // long time, discard them.
              //
              if(LOG.isDebugEnabled()) {
                LOG.debug("Checking for old call responses.");
              }
              ArrayList<Call> calls;
              
              // get the list of channels from list of keys.
              synchronized (writeSelector.keys()) {
                calls = new ArrayList<Call>(writeSelector.keys().size());
                iter = writeSelector.keys().iterator();
                while (iter.hasNext()) {
                  SelectionKey key = iter.next();
                  Call call = (Call)key.attachment();
                  if (call != null && key.channel() == call.connection.channel) { 
                    calls.add(call);
                  }
                }
              }
              
              for(Call call : calls) {
                doPurge(call, now);
              }
            } catch (OutOfMemoryError e) {
              //
              // we can run out of memory if we have too many threads
              // log the event and sleep for a minute and give
              // some thread(s) a chance to finish
              //
              LOG.warn("Out of Memory in server select", e);
              try { Thread.sleep(60000); } catch (Exception ie) {}
            } catch (Exception e) {
              LOG.warn("Exception in Responder", e);
            }
          }
        }

    doAsyncWrite方法还是通过调用processResponse方法处理结果,processResponse方法调用channelWrite将结果返回给客户端,当不能一次性返回时,在processResponse方法里将返回结果再次注册OP_WRITE写事件,因而形成一个循环使得数据能全部返回给客户端。

  • 相关阅读:
    Struts2:对Action中方法进行输入校验
    struts2拦截器加自定义注解实现权限控制
    struts2文件上传
    struts2访问或添加几个属性(request/session/application属性)
    Struts2日期类型转换
    struts2接受请求参数
    struts2 动态方法调用
    为应用指定多个struts配置文件
    SpringMVC工作原理
    Struts2相关面试题
  • 原文地址:https://www.cnblogs.com/arbitrary/p/5642149.html
Copyright © 2011-2022 走看看