zoukankan      html  css  js  c++  java
  • livy server高并发下报错java.lang.RuntimeException: java.io.IOException: Unable to connect to provided ports 10000~10010

    使用 apache livy时,进行高并发任务压测出现错误

    pool-1-thread-111---start run xx.xx.xxx.xx
     
    [亲,你的任务正在节点 xx.xx.xxx.xx 的LivyServer 上飞, job history id: null]
     
    [Run Sql error: java.io.IOException: Unable to connect to provided ports 10000~10010", 亲,你的任务正在节点 xx.xx.xxx.xx 的HiveServer 上飞, job history id: null]

    查看livy源码,这个是配置的端口范围。当高并发下出现端口占用时就会报这个错。为了提高并发我将这块修改

    类 

    RSCConf

     端口在

    RpcServer 使用
    查看代码
    /**
       * Creating RPC Server
       * @param lconf The default RSC configs
       * @throws IOException
       * @throws InterruptedException
       */
      public RpcServer(RSCConf lconf) throws IOException, InterruptedException {
        this.config = lconf;
        this.portRange = config.get(LAUNCHER_PORT_RANGE);
        this.group = new NioEventLoopGroup(
          this.config.getInt(RPC_MAX_THREADS),
          Utils.newDaemonThreadFactory("RPC-Handler-%d"));
        int [] portData = getPortNumberAndRange();
        int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()];
        int endPort = portData[PortRangeSchema.END_PORT.ordinal()];
        boolean isContected = false;
        int attempts = 0;
        for(int tries = startingPortNumber ; tries<=endPort ; tries++){
          try {
            this.channel = getChannel(tries);
            isContected = true;
            break;
          } catch(SocketException e){
            LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage());
          }
        }
        if(!isContected) {
          connectRetrying(startingPortNumber, endPort, attempts);
          //throw new IOException("Unable to connect to provided ports " + this.portRange);
        }

    当端口都重试一遍没连接上就报错  Unable to connect to provided ports

     为了尽可能连接成功。修改代码

    增加新方法

      private Boolean connectRetrying(int startingPortNumber, int endPort, int attempts) throws InterruptedException, IOException {
        boolean isContected = false;
        while (true) {
          attempts++;
          if (attempts >= 3) {
            throw new IOException("Unable to connect to provided ports " + this.portRange + " the three attempt");
          }
          for (int tries = startingPortNumber; tries <= endPort; tries++) {
            try {
              this.channel = getChannel(tries);
              isContected = true;
              break;
            } catch (SocketException e) {
              LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage());
            }
          }
          if (isContected) {
            break;
          }
          Thread.sleep(100);
        }

    修改后的代码

      /**
       * Creating RPC Server
       * @param lconf The default RSC configs
       * @throws IOException
       * @throws InterruptedException
       */
      public RpcServer(RSCConf lconf) throws IOException, InterruptedException {
        this.config = lconf;
        this.portRange = config.get(LAUNCHER_PORT_RANGE);
        this.group = new NioEventLoopGroup(
          this.config.getInt(RPC_MAX_THREADS),
          Utils.newDaemonThreadFactory("RPC-Handler-%d"));
        int [] portData = getPortNumberAndRange();
        int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()];
        int endPort = portData[PortRangeSchema.END_PORT.ordinal()];
        boolean isContected = false;
        int attempts = 0;
        for(int tries = startingPortNumber ; tries<=endPort ; tries++){
          try {
            this.channel = getChannel(tries);
            isContected = true;
            break;
          } catch(SocketException e){
            LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage());
          }
        }
        if(!isContected) {
          connectRetrying(startingPortNumber, endPort, attempts);
          //throw new IOException("Unable to connect to provided ports " + this.portRange);
        }
        this.port = ((InetSocketAddress) channel.localAddress()).getPort();
        this.pendingClients = new ConcurrentHashMap<>();
        LOG.info("Connected to the port " + this.port);
        String address = config.get(RPC_SERVER_ADDRESS);
        if (address == null) {
          address = config.findLocalAddress();
        }
        this.address = address;
      }

    继续进行压测

    压测结果

    1,并发250情况下,集群资源不繁忙无阻塞。

    2,并发500情况下,集群资源不繁忙上述错误偶现

  • 相关阅读:
    C++ 归纳复习常规篇
    小技巧:linux启动nginx服务异常 systemctl daemon-reload
    每日日报
    每日日报
    每日日报
    每日日报
    每日日报
    每日日报
    Migration in EF 6 Code-First
    国外.net资源学习网站
  • 原文地址:https://www.cnblogs.com/songchaolin/p/12858628.html
Copyright © 2011-2022 走看看