zoukankan      html  css  js  c++  java
  • Jedis源代码探索

    【连接池实现】
    【一致性hash实现】
     
    【Redis客户端-Jedis源代码探索】【http://blog.sina.com.cn/s/blog_6bc4401501018bgh.html
     
    项目实践心得。代码一定要剖析到每一行,吸取精华才算凑效。


    1.Redis的通信协议
    Redis采用自定义的二进制通信协议。有一个基本规范

    发送命令规范:

    <参数个数>
    $<参数1字节数>
    <参数1>
         ...
    $<参数n字节数>
    <参数n>

    响应规范

    响应类型有返回数据的第一个字节决定的
    +代表一个状态信息,+ok
    -代表错误
    :返回的是一个整数
    $返回一个块数据,跟发送命令的规范一样。$<长度> <数据>
    *返回多个数据块。同上,后面跟着参数的个数和每个参数的数据。

    2.jedis的整体架构(核心解决方案)
            解剖jedis架构之前,先自己尝试设计一个客户端。如果用最简单的方式,你会如何设计?
            用最简单的Socket就可以实现了,socket跟服务端通信后,就可以按上面的格式发命令了。然后阻塞获得服务端返回的数据。听起来很简单吧。

            实际上jedis也是这样实现的。客户端连接数很大,怎么办?每次新建一个socket,开销会成线性增长。我们想到了连接池方案。先新建一批连接,使用时从连接池取一个,用完再回收。连接池实现起来有多复杂?用 apache 的连接池接口,可以简化开发成本。
            一台机容量有限,想将数据存储到多个节点。有很多种方案。在业务层面,我们可以将不同的业务数据存储在不同的节点(水平分割)。根据用户容量分割到不同的节点(纵向分割)。有没有通用一点的,我们想到的"一致性hash”方案。根据key,定位到目标节点,存储相关的数据
            吹了那么多,其实上面就是jedis的核心了。其他的只是处理各种的命令返回值而已了


    3.源代码剖析
            连接redis,非常简单,跟传统的socket通信一样。new一个socket,设置相关的参数,connect获取相关的输入输出流进行通信。它的特别之处在于,封装了这些流,方便操作。

     
        1. public void connect() {  
        2.         if (!isConnected()) {  
        3.             try {  
        4.                 socket = new Socket();  
        5.                 //->@wjw_add  
        6.                 socket.setReuseAddress(true);  
        7.                 socket.setKeepAlive(true);  //Will monitor the TCP connection is valid  
        8.                 socket.setTcpNoDelay(true);  //Socket buffer Whetherclosed, to ensure timely delivery of data  
        9.                 socket.setSoLinger(true,0);  //Control calls close () method, the underlying socket is closed immediately  
        10.                 //<-@wjw_add  
        11.   
        12.                 socket.connect(new InetSocketAddress(host, port), timeout);  
        13.                 socket.setSoTimeout(timeout);  
        14.                 outputStream = new RedisOutputStream(socket.getOutputStream());  
        15.                 inputStream = new RedisInputStream(socket.getInputStream());  
        16.             } catch (IOException ex) {  
        17.                 throw new JedisConnectionException(ex);  
        18.             }  
        19.         }  
        20.     }  


    发送命令,就是按照上面的所说的通信协议发送相关的命令

    要注意的是要将字符串转成二进制流

        1. private static void sendCommand(final RedisOutputStream os,  
        2.             final byte[] command, final byte[]... args) {  
        3.         try {  
        4.             os.write(ASTERISK_BYTE);  
        5.             os.writeIntCrLf(args.length + 1);  
        6.             os.write(DOLLAR_BYTE);  
        7.             os.writeIntCrLf(command.length);  
        8.             os.write(command);  
        9.             os.writeCrLf();  
        10.   
        11.             for (final byte[] arg : args) {  
        12.             os.write(DOLLAR_BYTE);  
        13.             os.writeIntCrLf(arg.length);  
        14.             os.write(arg);  
        15.             os.writeCrLf();  
        16.             }  
        17.         } catch (IOException e) {  
        18.             throw new JedisConnectionException(e);  
        19.         }  
        20.         }  



    处理服务端返回。Redis本身是单线程阻塞的。所以你可以从socket的InputStream中直接读取数据。
    获得二进制流,根据不同的命令,将它(解析)转成相应的数据类型即可。

        1. private static Object process(final RedisInputStream is) {  
        2.     try {  
        3.         byte b = is.readByte();  
        4.         if (b == MINUS_BYTE) {  
        5.         processError(is);//报错  
        6.         } else if (b == ASTERISK_BYTE) {  
        7.         return processMultiBulkReply(is);//*多个数据块  
        8.         } else if (b == COLON_BYTE) {  
        9.         return processInteger(is);//:整数  
        10.         } else if (b == DOLLAR_BYTE) {  
        11.         return processBulkReply(is);//$一个数据块  
        12.         } else if (b == PLUS_BYTE) {  
        13.         return processStatusCodeReply(is);//+状态  
        14.         } else {  
        15.         throw new JedisConnectionException("Unknown reply: " + (char) b);  
        16.         }  
        17.     } catch (IOException e) {  
        18.         throw new JedisConnectionException(e);  
        19.     }  
        20.     return null;  
        21.        }  


    连接池的实现。现在要实现连接池,一般都是用apache的pool接口来实现了。只需要两个步骤。
    1.实现Config 就是连接池的配置参数,包括最大连接数,初始连接数,空闲时间.
    2.实现 BasePoolableObjectFactory,提供新建连接,废弃连接,测试连接方法

        1. public Object makeObject() throws Exception {//产生redis实例  收藏代码
        2.          final Jedis jedis = new Jedis(this.host, this.port, this.timeout);  
        3.   
        4.          jedis.connect();  
        5.          if (null != this.password) {  
        6.              jedis.auth(this.password);  
        7.          }  
        8.          if( database != 0 ) {  
        9.              jedis.select(database);  
        10.          }  
        11.            
        12.          return jedis;  
        13.      }  
        14.   
        15.      public void destroyObject(final Object obj) throws Exception {//销毁jedis实例  
        16.          if (obj instanceof Jedis) {  
        17.              final Jedis jedis = (Jedis) obj;  
        18.              if (jedis.isConnected()) {  
        19.                  try {  
        20.                      try {  
        21.                          jedis.quit();  
        22.                      } catch (Exception e) {  
        23.                      }  
        24.                      jedis.disconnect();  
        25.                  } catch (Exception e) {  
        26.   
        27.                  }  
        28.              }  
        29.          }  
        30.      }  
        31.   
        32.      public boolean validateObject(final Object obj) {//验证jedis是否有效,简单的ping命令  
        33.          if (obj instanceof Jedis) {  
        34.              final Jedis jedis = (Jedis) obj;  
        35.              try {  
        36.                  return jedis.isConnected() && jedis.ping().equals("PONG");  
        37.              } catch (final Exception e) {  
        38.                  return false;  
        39.              }  
        40.          } else {  
        41.              return false;  
        42.          }  
        43.      }  



    一致性hash的实现。关键是如何使得key均匀散列。jedis使用了murmurhash 2.0

     
        1. public R getShard(String key) {  
        2.         return resources.get(getShardInfo(key));  
        3.     }  
        4.   
        5.     public S getShardInfo(byte[] key) {  
        6.         SortedMap tail = nodes.tailMap(algo.hash(key));//找到key所在的节点  
        7.         if (tail.size() == 0) {  
        8.             return nodes.get(nodes.firstKey());  
        9.         }  
        10.         return tail.get(tail.firstKey());  
        11.     }  


    4.经验教训

    这里谈谈实践中遇到的几个问题,都是血和泪的教训啊。


    1.jedis的连接池获得的连接,进行通信时候出错了,一定要记得销毁该连接。因为它的inputstream里面可能还残留数据。下次从连接池获得的时候都是dirty data了。一般采用以下的方案:

        1.   try{  
        2. //do something  
        3. pool.returnConnection;//返回正常连接  
        4. catch(Exception e){  
        5. pool.returnBrokenConnection;//销毁连接  


    2.使用一致性hash的时候,使用批量查询命令mget的时候,ShardedJedis本身不支持的,只能用一个个key去取数据,性能低下。有一个 比较土的办法。先将key对应的节点分类合并,然后单独用mget去获取数据,再将返回值合并给用户。可以显著减少网络连接。


    5.总结

    jedis简单,高效,本身代码也可以当做一个网络客户端的典型实现范例。

  • 相关阅读:
    Js实现页面处理器
    自定类型转换
    为什么PHP5中保存cookie以后,要刷新一次才能读取cookie的内容?
    PHP不不支持函数重载
    JS 省市区三级联动
    我喜欢的酷站
    网站宽度设置多少
    Windows环境下Node.js 以及NPM和CoffeeScript的安装配置
    HTML中Meta详解
    搭建SVN
  • 原文地址:https://www.cnblogs.com/lsx1993/p/4632983.html
Copyright © 2011-2022 走看看