zoukankan      html  css  js  c++  java
  • Hadoop建立IPC连接和数据读写

    建立IPC连接

    IPC Client通过调用getConnection获取IPC连接,具体流程图如下:

    image

    服务器端的IPC连接代码分散在Listener和Server.Connection中。

    Listener.run() 实现了NIO中的选择器循环。如下代码:

    //Listener构造函数
    public Listener() throws IOException {
          address = new InetSocketAddress(bindAddress, port);
          // Create a new server socket and set to non blocking mode
          acceptChannel = ServerSocketChannel.open();
          acceptChannel.configureBlocking(false);
    
          // Bind the server socket to the local host and port
          bind(acceptChannel.socket(), address, backlogLength);
          port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
          // create a selector;
          selector= Selector.open();
          readers = new Reader[readThreads];
          readPool = Executors.newFixedThreadPool(readThreads);
          for (int i = 0; i < readThreads; i++) {
            Selector readSelector = Selector.open();
            Reader reader = new Reader(readSelector);
            readers[i] = reader;
            readPool.execute(reader);
          }

    Listener.run()开启选择器循环,并处理Accept请求,如下:

    //Listener运行函数
    public void run() {
          LOG.info(getName() + ": starting");
          SERVER.set(Server.this);
          while (running) {
            SelectionKey key = null;
            try {
              selector.select();
              Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
              while (iter.hasNext()) {
                key = iter.next();
                iter.remove();
                try {
                  if (key.isValid()) {
                    if (key.isAcceptable())
                      doAccept(key);
                  }
                } catch (IOException e) {
                }
                key = null;
              }
            } catch (OutOfMemoryError e) {
              // we can run out of memory if we have too many threads
              // log the event and sleep for a minute and give 
              // some thread(s) a chance to finish
              LOG.warn("Out of Memory in server select", e);
              closeCurrentConnection(key, e);
              cleanupConnections(true);
              try { Thread.sleep(60000); } catch (Exception ie) {}
            } catch (Exception e) {
              closeCurrentConnection(key, e);
            }
            cleanupConnections(false);
          }
          LOG.info("Stopping " + this.getName());
    
          synchronized (this) {
            try {
              acceptChannel.close();
              selector.close();
            } catch (IOException e) { }
    
            selector= null;
            acceptChannel= null;
            
            // clean up all connections
            while (!connectionList.isEmpty()) {
              closeConnection(connectionList.remove(0));
            }
          }
        }

    doAccept()中通过server.accpet获取SocketChannel,并获取一个Reader对象,该对象包含一个Selector:readerSelector,通过reader.registerChannel,将SocketChannel注册到readerSelector下.并新建connection对象。

    //Do_Accept
    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
          Connection c = null;
          ServerSocketChannel server = (ServerSocketChannel) key.channel();
          SocketChannel channel;
          while ((channel = server.accept()) != null) {
            channel.configureBlocking(false);
            channel.socket().setTcpNoDelay(tcpNoDelay);
            Reader reader = getReader();
            try {
              reader.startAdd();
              SelectionKey readKey = reader.registerChannel(channel);
              c = new Connection(readKey, channel, System.currentTimeMillis());
              readKey.attach(c);
              synchronized (connectionList) {
                connectionList.add(numConnections, c);
                numConnections++;
              }
              if (LOG.isDebugEnabled())
                LOG.debug("Server connection from " + c.toString() +
                    "; # active connections: " + numConnections +
                    "; # queued calls: " + callQueue.size());          
            } finally {
              reader.finishAdd(); 
            }
    
          }
        }
    public synchronized SelectionKey registerChannel(SocketChannel channel)
                                                              throws IOException {
              return channel.register(readSelector, SelectionKey.OP_READ);
          }

    Listener拥有一个ExecutorService线程池用来运行Reader对象,Reader对象采用轮叫调度(Round Robin Scheduling算法就是以轮叫的方式依次将请求调度不同的服务器),如下:

    Reader getReader() {
          currentReader = (currentReader + 1) % readers.length;
          return readers[currentReader];
     }

    Reader对象的startAdd方法和finishAdd方法之间来注册channel和新建conncetion。

    以下是Reader对象的run方法,可以看出,当Reader对象调用startAdd方法后,线程从上一次的select阻塞中返回,并循环等待,直到finishAdd方法调用后,adding参数由true转false,跳出循环,再次进入select中。

    public void run() {
            LOG.info("Starting SocketReader");
            synchronized (this) {
              while (running) {
                SelectionKey key = null;
                try {
                  readSelector.select();
                  while (adding) {
                    this.wait(1000);
                  }              
    
                  Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                  while (iter.hasNext()) {
                    key = iter.next();
                    iter.remove();
                    if (key.isValid()) {
                      if (key.isReadable()) {
                        doRead(key);
                      }
                    }
                    key = null;
                  }
                } catch (InterruptedException e) {
                  if (running) {                      // unexpected -- log it
                    LOG.info(getName() + " caught: " +
                             StringUtils.stringifyException(e));
                  }
                } catch (IOException ex) {
                  LOG.error("Error in Reader", ex);
                }
              }
            }
          }

    当有新的数据到来时,调用Listener的doRead方法进行读取数据。

    doRead方法主要调用了Server.Connection的readAndProcess方法来读取客户端发送过来的数据并处理。

    readAndProcess方法,先读取IPC连接魔数和协议版本号分别在缓冲区dataLengthBuffer和versionBuffer读入。

    if (dataLengthBuffer.remaining() > 0) {
              count = channelRead(channel, dataLengthBuffer); // dataLengthBuffer长度为4byte,第一次循环读取IPC连接魔数hrpc  
                //非第一次循环读取数据data
              if (count < 0 || dataLengthBuffer.remaining() > 0) 
                return count;
            }
          
            if (!rpcHeaderRead) {
              //Every connection is expected to send the header.
              if (rpcHeaderBuffer == null) {
                rpcHeaderBuffer = ByteBuffer.allocate(2);
              }
              count = channelRead(channel, rpcHeaderBuffer); //rpcHeaderBuffer长度为2byte,读取协议版本号和authMehod
              if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
                return count;
              }
              int version = rpcHeaderBuffer.get(0);//协议版本号
              byte[] method = new byte[] {rpcHeaderBuffer.get(1)};//authMehod
              authMethod = AuthMethod.read(new DataInputStream(
                  new ByteArrayInputStream(method)));
              dataLengthBuffer.flip();          
              if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
                //Warning is ok since this is not supposed to happen.
                LOG.warn("Incorrect header or version mismatch from " + 
                         hostAddress + ":" + remotePort +
                         " got version " + version + 
                         " expected version " + CURRENT_VERSION);
                return -1;
              }
              dataLengthBuffer.clear();
              ……//sasl处理部分,可以忽略         
              rpcHeaderBuffer = null;
              rpcHeaderRead = true;
              continue;
            }

    在接下类的数据处理部分,readAndProcess主要调用了saslReadAndProcess方法,进而调用processOneRpc方法,进而处理processHeader方法和processData方法。processHeader方法如下:

    private void processHeader(byte[] buf) throws IOException {
          DataInputStream in =
            new DataInputStream(new ByteArrayInputStream(buf));
          header.readFields(in);
          try {
            String protocolClassName = header.getProtocol();
            if (protocolClassName != null) {
              protocol = getProtocolClass(header.getProtocol(), conf);
              // getProtocolClass只判断Server是否实现了ConnectionHeader中要求的IPC接口
              //如果实现,返回Class,否则抛异常。
            }
          } catch (ClassNotFoundException cnfe) {
            throw new IOException("Unknown protocol: " + header.getProtocol());
          }
          ......//获取用户信息
        }

    数据分帧和读取

    Tcp通信基于字节流,没有消息边界的概念,常用的分帧方法如下:

    1. 定长消息:通信双方发送的消息长度为固定的,接收者只需要将数据读入到相应的缓冲区即可。

    2. 基于界定符:消息结束由唯一标示(特殊字符序列)指出,这个特殊字符序列不能在传输的消息数据中出现,接收者简单扫描输入信息查找界定符。

    3. 显式长度:具体消息前面附加固定大小的字段,用来指示消息包含多少字节。

    IPC客户端发送请求采用“显式长度”方法,长度是int类型。如下是服务器端接收数据的相关代码:

    if (data == null) {
              dataLengthBuffer.flip();
              dataLength = dataLengthBuffer.getInt();//读取数据长度
              //dataLengthBuffer长度为4byte,正好一个int长度。
           
              if (dataLength == Client.PING_CALL_ID) {//心跳消息
                if(!useWrap) { //covers the !useSasl too
                  dataLengthBuffer.clear();
                  return 0;  //ping message
                }
              }
              if (dataLength < 0) {
                LOG.warn("Unexpected data length " + dataLength + "!! from " + 
                    getHostAddress());
              }
              data = ByteBuffer.allocate(dataLength);
              //根据长度,为读取数据分配缓冲区。
            }
            
            count = channelRead(channel, data);
            
            if (data.remaining() == 0) {//读到一个完整的消息
              dataLengthBuffer.clear();
              data.flip();
              if (skipInitialSaslHandshake) {
                data = null;
                skipInitialSaslHandshake = false;
                continue;
              }
              boolean isHeaderRead = headerRead;
              if (useSasl) {
                saslReadAndProcess(data.array());
              } else {
                processOneRpc(data.array());//处理消息
              }
              data = null;
              if (!isHeaderRead) {
                continue;
              }
            }

    而IPC服务器端到客户端用的是“定长消息”的变种,即利用前面介绍的序列化机制Writable发送响应。双方无需界定符和长度信息,其本质是一种“定长消息”。服务器端往客户端写数据的代码见Server.setupResponse,客户端读数据的代码在Connection.receiveResponse。

  • 相关阅读:
    在QT中应用中文
    DDA画直线
    裁剪算法
    VC6与office2007冲突的解决方法
    肾形图案
    OPENGL函数说明
    基数排序
    Qt信号和槽机制
    分形曼德尔波集合图形
    分形朱利亚集合图形1
  • 原文地址:https://www.cnblogs.com/dorothychai/p/4220873.html
Copyright © 2011-2022 走看看