zoukankan      html  css  js  c++  java
  • JedisCluster中应用的Apache Commons Pool对象池技术

    对象池技术在服务器开发上应用广泛。在各种对象池的实现中,尤其以数据库的连接池最为明显,可以说是每个服务器必须实现的部分。
     
    apache common pool 官方文档可以参考:https://commons.apache.org/proper/commons-pool/
     
    结合JedisPool看Commons Pool对象池技术
     
    结合JedisPool,我们来了解一下commons pool的整体设计:
     


     
     
    面向用户的往往是ObjectPool,用户看到的是一个对象池,对于使用Redis连接的用户来说,就是JedisPool。对象池ObjectPool提供了借用对象,返还对象,验证对象等API,需要具体的配置GenericObjectPoolConfig来确定池的大小,以及创建具体池化对象的工厂接口PooledObjectFactory来根据需要创建,销毁,激活,钝化每个对象。
     
    PooledObjectFactory接口,用来创建池对象(makeObject),将不用的池对象进行钝化(passivateObject),对要使用的池对象进行激活(activateObject),对池对象进行验证(valiateObject),将有问题的池对象销毁(destroyObject)。
     
    如果需要使用commons-pool,那么就需要提供一个PooledObjectFactory接口的具体实现,一个比较简单的办法是使用BasePooledObjectFactory这个抽象类,只需要实现两个方法:create()和wrap(T obj)。JedisFactory也就是用来创建每个Jedis连接的对象工厂类,其中直接实现了PooledObjectFactory,makeObject的过程中,直接创建了PooledObject<Redis>。
     
    当我们使用JedisPool.getResource(),用于返回jedis连接时,实际调用的是其中GenericObjectPool的borrowObject方法,在Jedis连接池中借用一个对象。
     
    借用对象时,先去idleObjects(LinkedBlockingDeque<Pooled<Jedis>>)列表中查看是否有空闲的对象,如果存在则直接使用;如果不存在,则需要考虑在没有超出连接池最大数量的情况下,使用PooledObjectFactory进行初始化,这里使用的是JedisFactory.makeObject来创建连接,并将其激活。
     
     
    对于Jedis对象,不能总是重用同一个对象,在使用一段时间后其就会产生失效,连接出现异常。此时就需要使用JedisPool来获取资源,注意在最后要回收资源,实际上就是returnObject,以下面的代码作为实例:
     
     
    Jedis jedis = jedisPool.getResource();
            try {
                while (true) {
                    String productCountString = jedis.get("product");
                    if (Integer.parseInt(productCountString) > 0) {
                        if (acquireLock(jedis, "abc")) {
                            int productCount = Integer.parseInt(jedis.get("product"));
                            System.out.println(String.format("%tT --- Get product: %s", new Date(), productCount));
    //                        System.out.println(productCount);
                            jedis.decr("product");
                            releaseLock(jedis, "abc");
                            return "Success";
                        }
                        Thread.sleep(1000L);
                    } else {
                        return "Over";
                    }
                }
            } finally {
                jedis.close();
            }
     
     
     
     
    JedisCluster的连接/执行源码研究
     
     
    我们使用的JedisCluster(Redis集群模式)需要初始化并使用JedisCluster对象,通过该对象来进行Redis的相关操作,下面就列举出了JedisCluster的基本类图结构:
     


     
     
    在执行任务BinaryJedisCluster的相关命令 set/get/exist 等redis命令时,都采用回调的方式:
     
     
    @Override
      public String set(final byte[] key, final byte[] value) {
        return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
          @Override
          public String execute(Jedis connection) {
            return connection.set(key, value);
          }
        }.runBinary(key);
      }
     
     
     
    初始化一个JedisClusterCommand对象,执行runBinary方法,进行execute(Jedis connection)回调,其实可以看出执行回调之前的作用是将使用到的Jedis连接在内部统一管理起来。
     
    可以猜想使用了JedisSlotBasedConnectionHandler中实现了父类定义的getConnection()获取Redis连接的方法:
     
     
    @Override
      public Jedis getConnection() {
        // In antirez's redis-rb-cluster implementation,
        // getRandomConnection always return valid connection (able to
        // ping-pong)
        // or exception if all connections are invalid
     
        List<JedisPool> pools = getShuffledNodesPool();
     
        for (JedisPool pool : pools) {
          Jedis jedis = null;
          try {
            jedis = pool.getResource();
     
            if (jedis == null) {
              continue;
            }
     
            String result = jedis.ping();
     
            if (result.equalsIgnoreCase("pong")) return jedis;
     
            pool.returnBrokenResource(jedis);
          } catch (JedisConnectionException ex) {
            if (jedis != null) {
              pool.returnBrokenResource(jedis);
            }
          }
        }
     
        throw new JedisConnectionException("no reachable node in cluster");
      }
     
     
     
     
    其中调用的方法 getShuffledNodesPool(),就是从JedisClusterInfoCache中包含的所有JedisPool,执行shuffle操作,随机拿到对应的JedisPool,去其中getResource拿到连接。
     
    这属于随机去获取connection,但事实上并不是这样处理的,我们可以通过slot来获得其对应的Connection,在JedisClusterCommand.run方法的最后一行中,其中第三个参数为是否为tryRandomMode,调用方式显示为非random Mode。
     
    return runWithRetries(SafeEncoder.encode(keys[0]), this.redirections, false, false);
    
      
     
    可以根据slot来定位到具体的JedisPool,getResource拿到对应的Jedis Connection,但该方法也标明了不能保证一定能够拿到可用的连接。
     
    @Override
    public Jedis getConnectionFromSlot(int slot) {
      JedisPool connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {
        // It can't guaranteed to get valid connection because of node
        // assignment
        return connectionPool.getResource();
      } else {
        return getConnection();
      }
    }
     
     
     
    在JedisClusterInfoCache缓存了Map<String,JedisPool>(host:port->JedisPool)和Map<Integer, JedisPool>(slot->JedisPool),用于查询连接,那么这两个缓存是如何查询出来的,这就需要用到Jedis.clusterNodes,它可以通过该Redis连接找到其他连接的相关配置,例如可以发现整个集群的配置,其中三个master,三个slave,并且能够识别出自身连接,可参考文档:http://redis.io/commands/cluster-nodes
     
     
    5974ed7dd81c112d9a2354a0a985995913b4702c 192.168.1.137:6389 master - 0 1468809898374 26 connected 0-5640
    d08dc883ee4fcb90c4bb47992ee03e6474398324 192.168.1.137:6390 master - 0 1468809898875 25 connected 5641-11040
    ffb4db4e1ced0f91ea66cd2335f7e4eadc29fd56 192.168.1.138:6390 slave 5974ed7dd81c112d9a2354a0a985995913b4702c 0 1468809899376 26 connected
    c69b521a30336caf8bce078047cf9bb5f37363ee 192.168.1.137:6388 master - 0 1468809897873 28 connected 11041-16383
    532e58842d001f8097fadc325bdb5541b788a360 192.168.1.138:6389 slave c69b521a30336caf8bce078047cf9bb5f37363ee 0 1468809899876 28 connected
    aa52c7810e499d042e94e0aa4bc28c57a1da74e3 192.168.1.138:6388 myself,slave d08dc883ee4fcb90c4bb47992ee03e6474398324 0 0 19 connected
     
     
     
    分配slot只可能在master节点上发生,而不会在slave节点上发生,这意味着Redis集群并未进行类似读写分离的形式。当Redis集群的slot发生改变时,会重新初始化该Cache,重置slot。
     
    而执行每个get/set等Redis操作时,真正的核心入口,其实是JedisClusterCommand.runWithRetries方法:
     
     
    private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {
        if (redirections <= 0) {
          throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
        }
     
        Jedis connection = null;
        try {
     
          if (asking) {
            // TODO: Pipeline asking with the original command to make it
            // faster....
            connection = askConnection.get();
            connection.asking();
     
            // if asking success, reset asking flag
            asking = false;
          } else {
            if (tryRandomNode) {
              connection = connectionHandler.getConnection();
            } else {
              connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
            }
          }
     
          return execute(connection);
        } catch (JedisConnectionException jce) {
          if (tryRandomNode) {
            // maybe all connection is down
            throw jce;
          }
     
          // release current connection before recursion
          releaseConnection(connection);
          connection = null;
     
          // retry with random connection
          return runWithRetries(key, redirections - 1, true, asking);
        } catch (JedisRedirectionException jre) {
          // if MOVED redirection occurred,
          if (jre instanceof JedisMovedDataException) {
            // it rebuilds cluster's slot cache
            // recommended by Redis cluster specification
            this.connectionHandler.renewSlotCache(connection);
          }
     
          // release current connection before recursion or renewing
          releaseConnection(connection);
          connection = null;
     
          if (jre instanceof JedisAskDataException) {
            asking = true;
            askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
          } else if (jre instanceof JedisMovedDataException) {
          } else {
            throw new JedisClusterException(jre);
          }
     
          return runWithRetries(key, redirections - 1, false, asking);
        } finally {
          releaseConnection(connection);
        }
      }
     
     
     
    出现的Redis Retries问题
     
    可以参考:http://carlosfu.iteye.com/blog/2251034,讲的非常好。同样,我们的出现的异常堆栈:
     
    - 2016-06-04 00:02:51,911 [// - - ] ERROR xxx - Too many Cluster redirections?
    redis.clients.jedis.exceptions.JedisClusterMaxRedirectionsException: Too many Cluster redirections?
    at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:97)
    at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:131)
    at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:152)
    at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:131)
     
     
    直译过来就是出现过多的redirections异常,出现过JedisConnectionException,完整的堆栈内容:
     
     
    redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.
        at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:198)
        at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
        at redis.clients.jedis.Protocol.process(Protocol.java:141)
        at redis.clients.jedis.Protocol.read(Protocol.java:205)
        at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297)
        at redis.clients.jedis.Connection.getBinaryBulkReply(Connection.java:216)
        at redis.clients.jedis.Connection.getBulkReply(Connection.java:205)
        at redis.clients.jedis.Jedis.get(Jedis.java:101)
        at redis.clients.jedis.JedisCluster$3.execute(JedisCluster.java:79)
        at redis.clients.jedis.JedisCluster$3.execute(JedisCluster.java:76)
        at redis.clients.jedis.JedisClusterCommand.runWithRetries(JedisClusterCommand.java:119)
        at redis.clients.jedis.JedisClusterCommand.run(JedisClusterCommand.java:30)
        at redis.clients.jedis.JedisCluster.get(JedisCluster.java:81)
        at redis.RedisClusterTest.main(RedisClusterTest.java:30)
     
     
     
     
    调试状态下的异常信息:
     
     
    jce = {redis.clients.jedis.exceptions.JedisConnectionException@1014} "redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream."
     detailMessage = "Unexpected end of stream."
     cause = {redis.clients.jedis.exceptions.JedisConnectionException@1014} "redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream."
     stackTrace = {java.lang.StackTraceElement[0]@1017}
     suppressedExceptions = {java.util.Collections$UnmodifiableRandomAccessList@1018}  size = 0
     
     
     
    关于这个问题,可以参考:http://blog.csdn.net/jiangguilong2000/article/details/45025355
     
    客户端buffer控制。在客户端与server进行的交互中,每个连接都会与一个buffer关联,此buffer用来队列化等待被client接受的响应信息。如果client不能及时的消费响应信息,那么buffer将会被不断积压而给server带来内存压力.如果buffer中积压的数据达到阀值,将会导致连接被关闭,buffer被移除。
     
     开发环境上执行查询该参数的命令:config get client-output-buffer-limit
     
     
    1) "client-output-buffer-limit"
    2) "normal 0 0 0 slave 268435456 67108864 60 pubsub 33554432 8388608 60"
    
      
     
    关于Redis上的所有参数详解,可以参考:http://shift-alt-ctrl.iteye.com/blog/1882850
     
    JedisMovedDataException
     
    jre = {redis.clients.jedis.exceptions.JedisMovedDataException@2008} "redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 8855 192.168.1.137:6390"
     targetNode = {redis.clients.jedis.HostAndPort@2015} "192.168.1.137:6390"
     slot = 8855
     detailMessage = "MOVED 8855 192.168.1.137:6390"
     cause = {redis.clients.jedis.exceptions.JedisMovedDataException@2008} "redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 8855 192.168.1.137:6390"
     stackTrace = {java.lang.StackTraceElement[0]@1978}
     suppressedExceptions = {java.util.Collections$UnmodifiableRandomAccessList@1979}  size = 0
     
     
     
    日志中出现超时异常:
     
    4851:S 18 Jul 11:05:38.005 * Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.
    
      
    可以参考github上关于redis的讨论:https://github.com/antirez/redis/issues/641,关闭AOF,可以暂时解决问题。JedisCluster中应用的Apache Commons Pool对象池技术 
     
  • 相关阅读:
    监控里的主码流和子码流是什么意思
    监控硬盘容量计算
    一个能让你了解所有函数调用顺序的Android库
    电工选线
    oracle linux dtrace
    list all of the Oracle 12c hidden undocumented parameters
    Oracle Extended Tracing
    window 驱动开发
    win7 x64 dtrace
    How to Use Dtrace Tracing Ruby Executing
  • 原文地址:https://www.cnblogs.com/mmaa/p/5789850.html
Copyright © 2011-2022 走看看