zoukankan      html  css  js  c++  java
  • Jedis源代码探索

    【连接池实现】
    【一致性hash实现】
     
    【Redis客户端-Jedis源代码探索】【http://blog.sina.com.cn/s/blog_6bc4401501018bgh.html
     
    项目实践心得。代码一定要剖析到每一行,吸取精华才算凑效。


    1.Redis的通信协议
    Redis采用自定义的二进制通信协议。有一个基本规范

    发送命令规范:

    <参数个数>
    $<参数1字节数>
    <参数1>
         ...
    $<参数n字节数>
    <参数n>

    响应规范

    响应类型有返回数据的第一个字节决定的
    +代表一个状态信息,+ok
    -代表错误
    :返回的是一个整数
    $返回一个块数据,跟发送命令的规范一样。$<长度> <数据>
    *返回多个数据块。同上,后面跟着参数的个数和每个参数的数据。

    2.jedis的整体架构(核心解决方案)
            解剖jedis架构之前,先自己尝试设计一个客户端。如果用最简单的方式,你会如何设计?
            用最简单的Socket就可以实现了,socket跟服务端通信后,就可以按上面的格式发命令了。然后阻塞获得服务端返回的数据。听起来很简单吧。

            实际上jedis也是这样实现的。客户端连接数很大,怎么办?每次新建一个socket,开销会成线性增长。我们想到了连接池方案。先新建一批连接,使用时从连接池取一个,用完再回收。连接池实现起来有多复杂?用 apache 的连接池接口,可以简化开发成本。
            一台机容量有限,想将数据存储到多个节点。有很多种方案。在业务层面,我们可以将不同的业务数据存储在不同的节点(水平分割)。根据用户容量分割到不同的节点(纵向分割)。有没有通用一点的,我们想到的"一致性hash”方案。根据key,定位到目标节点,存储相关的数据
            吹了那么多,其实上面就是jedis的核心了。其他的只是处理各种的命令返回值而已了


    3.源代码剖析
            连接redis,非常简单,跟传统的socket通信一样。new一个socket,设置相关的参数,connect获取相关的输入输出流进行通信。它的特别之处在于,封装了这些流,方便操作。

     
        1. public void connect() {  
        2.         if (!isConnected()) {  
        3.             try {  
        4.                 socket = new Socket();  
        5.                 //->@wjw_add  
        6.                 socket.setReuseAddress(true);  
        7.                 socket.setKeepAlive(true);  //Will monitor the TCP connection is valid  
        8.                 socket.setTcpNoDelay(true);  //Socket buffer Whetherclosed, to ensure timely delivery of data  
        9.                 socket.setSoLinger(true,0);  //Control calls close () method, the underlying socket is closed immediately  
        10.                 //<-@wjw_add  
        11.   
        12.                 socket.connect(new InetSocketAddress(host, port), timeout);  
        13.                 socket.setSoTimeout(timeout);  
        14.                 outputStream = new RedisOutputStream(socket.getOutputStream());  
        15.                 inputStream = new RedisInputStream(socket.getInputStream());  
        16.             } catch (IOException ex) {  
        17.                 throw new JedisConnectionException(ex);  
        18.             }  
        19.         }  
        20.     }  


    发送命令,就是按照上面的所说的通信协议发送相关的命令

    要注意的是要将字符串转成二进制流

        1. private static void sendCommand(final RedisOutputStream os,  
        2.             final byte[] command, final byte[]... args) {  
        3.         try {  
        4.             os.write(ASTERISK_BYTE);  
        5.             os.writeIntCrLf(args.length + 1);  
        6.             os.write(DOLLAR_BYTE);  
        7.             os.writeIntCrLf(command.length);  
        8.             os.write(command);  
        9.             os.writeCrLf();  
        10.   
        11.             for (final byte[] arg : args) {  
        12.             os.write(DOLLAR_BYTE);  
        13.             os.writeIntCrLf(arg.length);  
        14.             os.write(arg);  
        15.             os.writeCrLf();  
        16.             }  
        17.         } catch (IOException e) {  
        18.             throw new JedisConnectionException(e);  
        19.         }  
        20.         }  



    处理服务端返回。Redis本身是单线程阻塞的。所以你可以从socket的InputStream中直接读取数据。
    获得二进制流,根据不同的命令,将它(解析)转成相应的数据类型即可。

        1. private static Object process(final RedisInputStream is) {  
        2.     try {  
        3.         byte b = is.readByte();  
        4.         if (b == MINUS_BYTE) {  
        5.         processError(is);//报错  
        6.         } else if (b == ASTERISK_BYTE) {  
        7.         return processMultiBulkReply(is);//*多个数据块  
        8.         } else if (b == COLON_BYTE) {  
        9.         return processInteger(is);//:整数  
        10.         } else if (b == DOLLAR_BYTE) {  
        11.         return processBulkReply(is);//$一个数据块  
        12.         } else if (b == PLUS_BYTE) {  
        13.         return processStatusCodeReply(is);//+状态  
        14.         } else {  
        15.         throw new JedisConnectionException("Unknown reply: " + (char) b);  
        16.         }  
        17.     } catch (IOException e) {  
        18.         throw new JedisConnectionException(e);  
        19.     }  
        20.     return null;  
        21.        }  


    连接池的实现。现在要实现连接池,一般都是用apache的pool接口来实现了。只需要两个步骤。
    1.实现Config 就是连接池的配置参数,包括最大连接数,初始连接数,空闲时间.
    2.实现 BasePoolableObjectFactory,提供新建连接,废弃连接,测试连接方法

        1. public Object makeObject() throws Exception {//产生redis实例  收藏代码
        2.          final Jedis jedis = new Jedis(this.host, this.port, this.timeout);  
        3.   
        4.          jedis.connect();  
        5.          if (null != this.password) {  
        6.              jedis.auth(this.password);  
        7.          }  
        8.          if( database != 0 ) {  
        9.              jedis.select(database);  
        10.          }  
        11.            
        12.          return jedis;  
        13.      }  
        14.   
        15.      public void destroyObject(final Object obj) throws Exception {//销毁jedis实例  
        16.          if (obj instanceof Jedis) {  
        17.              final Jedis jedis = (Jedis) obj;  
        18.              if (jedis.isConnected()) {  
        19.                  try {  
        20.                      try {  
        21.                          jedis.quit();  
        22.                      } catch (Exception e) {  
        23.                      }  
        24.                      jedis.disconnect();  
        25.                  } catch (Exception e) {  
        26.   
        27.                  }  
        28.              }  
        29.          }  
        30.      }  
        31.   
        32.      public boolean validateObject(final Object obj) {//验证jedis是否有效,简单的ping命令  
        33.          if (obj instanceof Jedis) {  
        34.              final Jedis jedis = (Jedis) obj;  
        35.              try {  
        36.                  return jedis.isConnected() && jedis.ping().equals("PONG");  
        37.              } catch (final Exception e) {  
        38.                  return false;  
        39.              }  
        40.          } else {  
        41.              return false;  
        42.          }  
        43.      }  



    一致性hash的实现。关键是如何使得key均匀散列。jedis使用了murmurhash 2.0

     
        1. public R getShard(String key) {  
        2.         return resources.get(getShardInfo(key));  
        3.     }  
        4.   
        5.     public S getShardInfo(byte[] key) {  
        6.         SortedMap tail = nodes.tailMap(algo.hash(key));//找到key所在的节点  
        7.         if (tail.size() == 0) {  
        8.             return nodes.get(nodes.firstKey());  
        9.         }  
        10.         return tail.get(tail.firstKey());  
        11.     }  


    4.经验教训

    这里谈谈实践中遇到的几个问题,都是血和泪的教训啊。


    1.jedis的连接池获得的连接,进行通信时候出错了,一定要记得销毁该连接。因为它的inputstream里面可能还残留数据。下次从连接池获得的时候都是dirty data了。一般采用以下的方案:

        1.   try{  
        2. //do something  
        3. pool.returnConnection;//返回正常连接  
        4. catch(Exception e){  
        5. pool.returnBrokenConnection;//销毁连接  


    2.使用一致性hash的时候,使用批量查询命令mget的时候,ShardedJedis本身不支持的,只能用一个个key去取数据,性能低下。有一个 比较土的办法。先将key对应的节点分类合并,然后单独用mget去获取数据,再将返回值合并给用户。可以显著减少网络连接。


    5.总结

    jedis简单,高效,本身代码也可以当做一个网络客户端的典型实现范例。

  • 相关阅读:
    Java实现 洛谷 P1060 开心的金明
    (Java实现) 洛谷 P1605 迷宫
    (Java实现) 洛谷 P1605 迷宫
    (Java实现)洛谷 P1093 奖学金
    (Java实现)洛谷 P1093 奖学金
    Java实现 洛谷 P1064 金明的预算方案
    Java实现 洛谷 P1064 金明的预算方案
    (Java实现) 洛谷 P1031 均分纸牌
    QT树莓派交叉编译环开发环境搭建(附多个exe工具下载链接)
    武则天红人对唐睿宗的桃色报复(如此缺少城府,注定了要在宫廷中过早地出局)
  • 原文地址:https://www.cnblogs.com/lsx1993/p/4632983.html
Copyright © 2011-2022 走看看