zoukankan      html  css  js  c++  java
  • HADOOP源码分析之RPC(1)

    源码位于Hadoop-common ipc包下

    abstract class Server

    构造Server

    protected Server(String bindAddress, int port,
          Class<? extends Writable> rpcRequestClass, int handlerCount,
          int numReaders, int queueSizePerHandler, Configuration conf,
          String serverName, SecretManager<? extends TokenIdentifier> secretManager,
          String portRangeConfig)
        throws IOException {
    
        //监听地址
        this.bindAddress = bindAddress;
        this.conf = conf;
        this.portRangeConfig = portRangeConfig;
        //监听端口
        this.port = port;
        this.rpcRequestClass = rpcRequestClass; 
        //处理器个数
        this.handlerCount = handlerCount;
        this.socketSendBufferSize = 0;
        this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
            CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
        if (queueSizePerHandler != -1) {
          this.maxQueueSize = handlerCount * queueSizePerHandler;
        } else {
          this.maxQueueSize = handlerCount * conf.getInt(
              CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
              CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);      
        }
        this.maxRespSize = conf.getInt(
            CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
            CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
        if (numReaders != -1) {
          this.readThreads = numReaders;
        } else {
          this.readThreads = conf.getInt(
              CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
              CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
        }
        this.readerPendingConnectionQueue = conf.getInt(
            CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
            CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
    
        // Setup appropriate callqueue
        final String prefix = getQueueClassPrefix();
        this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
            getSchedulerClass(prefix, conf),
            getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
    
        this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
        this.authorize = 
          conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, 
                          false);
    
        // configure supported authentications
        this.enabledAuthMethods = getAuthMethods(secretManager, conf);
        this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
        
        // Start the listener here and let it bind to the port
        //监听器
        listener = new Listener();
        this.port = listener.getAddress().getPort();    
        connectionManager = new ConnectionManager();
        this.rpcMetrics = RpcMetrics.create(this, conf);
        this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
        this.tcpNoDelay = conf.getBoolean(
            CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
            CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
    
        this.setLogSlowRPC(conf.getBoolean(
            CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
            CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
    
        // Create the responder here
        responder = new Responder();
        
        if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
          SaslRpcServer.init(conf);
          saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
        }
        
        this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
      }

    Server的主要组成即Listener、

    均是单独的线程,底层利用Java NIO实现(Reactor设计模式)  参考NIO系列文章:http://ifeve.com/overview/

    如下是创建Listener的源码:

        //创建一个ServerSocketChannel 
          acceptChannel = ServerSocketChannel.open();
          acceptChannel.configureBlocking(false);
    
          // Bind the server socket to the local host and port
          bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
          port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
          // create a selector;
          selector= Selector.open();
          readers = new Reader[readThreads];
          for (int i = 0; i < readThreads; i++) {
            Reader reader = new Reader(
                "Socket Reader #" + (i + 1) + " for port " + port);
            readers[i] = reader;
            reader.start();
          }
    
          // Register accepts on the server socket with the selector.
    //注册channel到selector
        acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true);

    Listener线程通过Selector不断监听请求建立连接的Socket

    public void run() {
          LOG.info(Thread.currentThread().getName() + ": starting");
          SERVER.set(Server.this);
          connectionManager.startIdleScan();
          while (running) {
            SelectionKey key = null;
            try {
              getSelector().select();
              Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
              while (iter.hasNext()) {
                key = iter.next();
                iter.remove();
                try {
                  if (key.isValid()) {
                    if (key.isAcceptable())
                      doAccept(key);
                  }
                } catch (IOException e) {
                }
                key = null;
              }
            } catch (OutOfMemoryError e) {
              // we can run out of memory if we have too many threads
              // log the event and sleep for a minute and give 
              // some thread(s) a chance to finish
              LOG.warn("Out of Memory in server select", e);
              closeCurrentConnection(key, e);
              connectionManager.closeIdle(true);
              try { Thread.sleep(60000); } catch (Exception ie) {}
            } catch (Exception e) {
              closeCurrentConnection(key, e);
            }
          }
          LOG.info("Stopping " + Thread.currentThread().getName());
    
          synchronized (this) {
            try {
              acceptChannel.close();
              selector.close();
            } catch (IOException e) { }
    
            selector= null;
            acceptChannel= null;
            
            // close all connections
            connectionManager.stopIdleScan();
            connectionManager.closeAll();
          }
        }
  • 相关阅读:
    使用Redis实现分布式锁
    SpringBoot 定时任务的使用
    HTTP请求调试软件 Postman
    ElasticSearch的安装
    全文搜索 简介
    SpringBoot整合Redis
    Git 操作远程仓库(Github)
    Git的使用
    Git 简介、下载安装、配置
    Vue 商城的一些小demo(后台添加商品、前台购物车、本地存储的使用)
  • 原文地址:https://www.cnblogs.com/Dhouse/p/6894484.html
Copyright © 2011-2022 走看看