Jedis设计
Jedis作为推荐的java语言redis客户端,其抽象封装为三部分:
- 对象池设计:Pool,JedisPool,GenericObjectPool,BasePoolableObjectFactory,JedisFactory
- 面向用户的redis操作封装:BinaryJedisCommands,JedisCommands,BinaryJedis,Jedis
- 面向redis服务器的操作封装:Commands,Client,BinaryClient,Connection,Protocol
其类设计图如下:
关于common-pool的相关内容,可以参见:http://macrochen.iteye.com/blog/320077
其他类的设计作用如下:
类名 | 职责 |
Pool | 抽象Jedis对象池操作;并委托给操作给GenericObjectPool |
JedisPool | 实现Pool并提供JedisFactory工厂 |
JedisFactory | 实现BasePoolableObjectFactory,提供创建,销毁Jedis方法 |
BinaryJedisCommands | 抽象面向客户端操作的Redis命令;key,value都为序列化后的byte数组 |
JedisCommands | 抽象面向客户端操作的Redis命令;提供String类型的key,value |
BinaryJedis | 实现BinaryJedisCommands接口,并将实际操作委托给Client |
Jedis | 实现JedisCommands接口,并将操作委托给Client |
Commands | 抽象Redis操作接口,提供String类型的key,value操作;被Jedis调用 |
Connection | 抽象了Redis连接;包括host,port,pass,socket,inputstream,outputstream,protocol 完成与Redis服务器的通信 |
Protocol | 抽象了Redis协议处理 |
BinaryClient | 继承Connection类,封装了基于Byte[]的key,value操作 |
Client | 继承BinaryClient同时实现了Commands,对上层提供基于String类型的操作 |
ShardedJedis实现分析
ShardedJedis是基于一致性哈希算法实现的分布式Redis集群客户端;ShardedJedis的设计分为以下几块:
- 对象池设计:Pool,ShardedJedisPool,ShardedJedisFactory
- 面向用户的操作封装:BinaryShardedJedis,BinaryShardedJedis
- 一致性哈希实现:Sharded
关于ShardedJedis设计,忽略了Jedis的设计细节,设计类图如下:
关于ShardedJedis类图设计,省略了对象池,以及Jedis设计的以下细节介绍:
类名 | 职责 |
Sharded |
抽象了基于一致性哈希算法的划分设计,设计思路
|
BinaryShardedJedis | 同BinaryJedis类似,实现BinaryJedisCommands对外提供基于Byte[]的key,value操作 |
ShardedJedis | 同Jedis类似,实现JedisCommands对外提供基于String的key,value操作 |
Sharded一致性哈希实现
shared一致性哈希采用以下方案:
- Redis服务器节点划分:将每台服务器节点采用hash算法划分为160个虚拟节点(可以配置划分权重)
- 将划分虚拟节点采用TreeMap存储
- 对每个Redis服务器的物理连接采用LinkedHashMap存储
- 对Key or KeyTag 采用同样的hash算法,然后从TreeMap获取大于等于键hash值得节点,取最邻近节点存储;当key的hash值大于虚拟节点hash值得最大值时,存入第一个虚拟节点
sharded采用的hash算法:MD5 和 MurmurHash两种;默认采用64位的MurmurHash算法;有兴趣的可以研究下,MurmurHash是一种高效,低碰撞的hash算法;参考地址:
spring集成配置例子
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-lazy-init="true"> <description>Jedis Configuration</description> <!-- 加载配置属性文件 --> <context:property-placeholder ignore-unresolvable="true" location="classpath:jeesite.properties" /> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="300" /> <!-- 最大能够保持idel状态的对象数 --> <property name="maxTotal" value="60000" /> <!-- 最大分配的对象数 --> <property name="testOnBorrow" value="true" /> <!-- 当调用borrow Object方法时,是否进行有效性检查 --> </bean> <bean id="jedisPool" class="redis.clients.jedis.JedisPool"> <constructor-arg index="0" ref="jedisPoolConfig" /> <constructor-arg index="1" value="${redis.host}" /> <constructor-arg index="2" value="${redis.port}" type="int" /> </bean> <!-- jedis集群部署 --> <bean id="shardedJedisPool" class="redis.clients.jedis.ShardedJedisPool" destroy-method="destroy"> <constructor-arg ref="jedisPoolConfig" /> <constructor-arg> <list> <bean class="redis.clients.jedis.JedisShardInfo"> <constructor-arg value="127.0.0.1" /> <constructor-arg type="int" value="7000" /> <constructor-arg value="instance:01" /> </bean> <bean class="redis.clients.jedis.JedisShardInfo"> <constructor-arg value="127.0.0.1" /> <constructor-arg type="int" value="7001" /> <constructor-arg value="instance:02" /> </bean> <bean class="redis.clients.jedis.JedisShardInfo"> <constructor-arg value="127.0.0.1" /> <constructor-arg type="int" value="7003" /> <constructor-arg value="instance:03" /> </bean> </list> </constructor-arg> </bean> </beans>
自己封装的SharedJedis操作客户端工具类:
import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.thinkgem.jeesite.common.config.Global; import redis.clients.jedis.ShardedJedis; import redis.clients.jedis.ShardedJedisPool; import redis.clients.jedis.exceptions.JedisException; /** * ShardedJedis Cache 工具类 * redis集群部署时的工具类 * */ public class SharedJedisUtils { private static Logger logger = LoggerFactory.getLogger(SharedJedisUtils.class); private static ShardedJedisPool shardedJedisPool = SpringContextHolder.getBean(ShardedJedisPool.class); public static final String KEY_PREFIX = Global.getConfig("redis.keyPrefix"); /** * 获取缓存 * @param key 键 * @return 值 */ public static String get(String key) { String value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)) { value = ShardedJedis.get(key); value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null; logger.debug("get {} = {}", key, value); } } catch (Exception e) { logger.warn("get {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 获取缓存 * @param key 键 * @return 值 */ public static Object getObject(String key) { Object value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))) { value = toObject(ShardedJedis.get(getBytesKey(key))); logger.debug("getObject {} = {}", key, value); } } catch (Exception e) { logger.warn("getObject {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 设置缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static String set(String key, String value, int cacheSeconds) { String result = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.set(key, value); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("set {} = {}", key, value); } catch (Exception e) { logger.warn("set {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 设置缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static String setObject(String key, Object value, int cacheSeconds) { String result = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.set(getBytesKey(key), toBytes(value)); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("setObject {} = {}", key, value); } catch (Exception e) { logger.warn("setObject {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 获取List缓存 * @param key 键 * @return 值 */ public static List<String> getList(String key) { List<String> value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)) { value = ShardedJedis.lrange(key, 0, -1); logger.debug("getList {} = {}", key, value); } } catch (Exception e) { logger.warn("getList {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 获取List缓存 * @param key 键 * @return 值 */ public static List<Object> getObjectList(String key) { List<Object> value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))) { List<byte[]> list = ShardedJedis.lrange(getBytesKey(key), 0, -1); value = Lists.newArrayList(); for (byte[] bs : list){ value.add(toObject(bs)); } logger.debug("getObjectList {} = {}", key, value); } } catch (Exception e) { logger.warn("getObjectList {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 设置List缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static long setList(String key, List<String> value, int cacheSeconds) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)) { ShardedJedis.del(key); } result = ShardedJedis.rpush(key, (String[])value.toArray()); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("setList {} = {}", key, value); } catch (Exception e) { logger.warn("setList {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 设置List缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static long setObjectList(String key, List<Object> value, int cacheSeconds) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))) { ShardedJedis.del(key); } List<byte[]> list = Lists.newArrayList(); for (Object o : value){ list.add(toBytes(o)); } result = ShardedJedis.rpush(getBytesKey(key), (byte[][])list.toArray()); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("setObjectList {} = {}", key, value); } catch (Exception e) { logger.warn("setObjectList {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 向List缓存中添加值 * @param key 键 * @param value 值 * @return */ public static long listAdd(String key, String... value) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.rpush(key, value); logger.debug("listAdd {} = {}", key, value); } catch (Exception e) { logger.warn("listAdd {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 向List缓存中添加值 * @param key 键 * @param value 值 * @return */ public static long listObjectAdd(String key, Object... value) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); List<byte[]> list = Lists.newArrayList(); for (Object o : value){ list.add(toBytes(o)); } result = ShardedJedis.rpush(getBytesKey(key), (byte[][])list.toArray()); logger.debug("listObjectAdd {} = {}", key, value); } catch (Exception e) { logger.warn("listObjectAdd {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 获取缓存 * @param key 键 * @return 值 */ public static Set<String> getSet(String key) { Set<String> value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)) { value = ShardedJedis.smembers(key); logger.debug("getSet {} = {}", key, value); } } catch (Exception e) { logger.warn("getSet {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 获取缓存 * @param key 键 * @return 值 */ public static Set<Object> getObjectSet(String key) { Set<Object> value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))) { value = Sets.newHashSet(); Set<byte[]> set = ShardedJedis.smembers(getBytesKey(key)); for (byte[] bs : set){ value.add(toObject(bs)); } logger.debug("getObjectSet {} = {}", key, value); } } catch (Exception e) { logger.warn("getObjectSet {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 设置Set缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static long setSet(String key, Set<String> value, int cacheSeconds) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)) { ShardedJedis.del(key); } result = ShardedJedis.sadd(key, (String[])value.toArray()); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("setSet {} = {}", key, value); } catch (Exception e) { logger.warn("setSet {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 设置Set缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static long setObjectSet(String key, Set<Object> value, int cacheSeconds) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))) { ShardedJedis.del(key); } Set<byte[]> set = Sets.newHashSet(); for (Object o : value){ set.add(toBytes(o)); } result = ShardedJedis.sadd(getBytesKey(key), (byte[][])set.toArray()); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("setObjectSet {} = {}", key, value); } catch (Exception e) { logger.warn("setObjectSet {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 向Set缓存中添加值 * @param key 键 * @param value 值 * @return */ public static long setSetAdd(String key, String... value) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.sadd(key, value); logger.debug("setSetAdd {} = {}", key, value); } catch (Exception e) { logger.warn("setSetAdd {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 向Set缓存中添加值 * @param key 键 * @param value 值 * @return */ public static long setSetObjectAdd(String key, Object... value) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); Set<byte[]> set = Sets.newHashSet(); for (Object o : value){ set.add(toBytes(o)); } result = ShardedJedis.rpush(getBytesKey(key), (byte[][])set.toArray()); logger.debug("setSetObjectAdd {} = {}", key, value); } catch (Exception e) { logger.warn("setSetObjectAdd {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 获取Map缓存 * @param key 键 * @return 值 */ public static Map<String, String> getMap(String key) { Map<String, String> value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)) { value = ShardedJedis.hgetAll(key); logger.debug("getMap {} = {}", key, value); } } catch (Exception e) { logger.warn("getMap {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 获取Map缓存 * @param key 键 * @return 值 */ public static Map<String, Object> getObjectMap(String key) { Map<String, Object> value = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))) { value = Maps.newHashMap(); Map<byte[], byte[]> map = ShardedJedis.hgetAll(getBytesKey(key)); for (Map.Entry<byte[], byte[]> e : map.entrySet()){ value.put(StringUtils.toString(e.getKey()), toObject(e.getValue())); } logger.debug("getObjectMap {} = {}", key, value); } } catch (Exception e) { logger.warn("getObjectMap {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return value; } /** * 设置Map缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static String setMap(String key, Map<String, String> value, int cacheSeconds) { String result = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)) { ShardedJedis.del(key); } result = ShardedJedis.hmset(key, value); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("setMap {} = {}", key, value); } catch (Exception e) { logger.warn("setMap {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 设置Map缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static String setObjectMap(String key, Map<String, Object> value, int cacheSeconds) { String result = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))) { ShardedJedis.del(key); } Map<byte[], byte[]> map = Maps.newHashMap(); for (Map.Entry<String, Object> e : value.entrySet()){ map.put(getBytesKey(e.getKey()), toBytes(e.getValue())); } result = ShardedJedis.hmset(getBytesKey(key), (Map<byte[], byte[]>)map); if (cacheSeconds != 0) { ShardedJedis.expire(key, cacheSeconds); } logger.debug("setObjectMap {} = {}", key, value); } catch (Exception e) { logger.warn("setObjectMap {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 向Map缓存中添加值 * @param key 键 * @param value 值 * @return */ public static String mapPut(String key, Map<String, String> value) { String result = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.hmset(key, value); logger.debug("mapPut {} = {}", key, value); } catch (Exception e) { logger.warn("mapPut {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 向Map缓存中添加值 * @param key 键 * @param value 值 * @return */ public static String mapObjectPut(String key, Map<String, Object> value) { String result = null; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); Map<byte[], byte[]> map = Maps.newHashMap(); for (Map.Entry<String, Object> e : value.entrySet()){ map.put(getBytesKey(e.getKey()), toBytes(e.getValue())); } result = ShardedJedis.hmset(getBytesKey(key), (Map<byte[], byte[]>)map); logger.debug("mapObjectPut {} = {}", key, value); } catch (Exception e) { logger.warn("mapObjectPut {} = {}", key, value, e); } finally { returnResource(ShardedJedis); } return result; } /** * 移除Map缓存中的值 * @param key 键 * @param value 值 * @return */ public static long mapRemove(String key, String mapKey) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.hdel(key, mapKey); logger.debug("mapRemove {} {}", key, mapKey); } catch (Exception e) { logger.warn("mapRemove {} {}", key, mapKey, e); } finally { returnResource(ShardedJedis); } return result; } /** * 移除Map缓存中的值 * @param key 键 * @param value 值 * @return */ public static long mapObjectRemove(String key, String mapKey) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.hdel(getBytesKey(key), getBytesKey(mapKey)); logger.debug("mapObjectRemove {} {}", key, mapKey); } catch (Exception e) { logger.warn("mapObjectRemove {} {}", key, mapKey, e); } finally { returnResource(ShardedJedis); } return result; } /** * 判断Map缓存中的Key是否存在 * @param key 键 * @param value 值 * @return */ public static boolean mapExists(String key, String mapKey) { boolean result = false; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.hexists(key, mapKey); logger.debug("mapExists {} {}", key, mapKey); } catch (Exception e) { logger.warn("mapExists {} {}", key, mapKey, e); } finally { returnResource(ShardedJedis); } return result; } /** * 判断Map缓存中的Key是否存在 * @param key 键 * @param value 值 * @return */ public static boolean mapObjectExists(String key, String mapKey) { boolean result = false; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.hexists(getBytesKey(key), getBytesKey(mapKey)); logger.debug("mapObjectExists {} {}", key, mapKey); } catch (Exception e) { logger.warn("mapObjectExists {} {}", key, mapKey, e); } finally { returnResource(ShardedJedis); } return result; } /** * 删除缓存 * @param key 键 * @return */ public static long del(String key) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(key)){ result = ShardedJedis.del(key); logger.debug("del {}", key); }else{ logger.debug("del {} not exists", key); } } catch (Exception e) { logger.warn("del {}", key, e); } finally { returnResource(ShardedJedis); } return result; } /** * 删除缓存 * @param key 键 * @return */ public static long delObject(String key) { long result = 0; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); if (ShardedJedis.exists(getBytesKey(key))){ result = ShardedJedis.del(getBytesKey(key)); logger.debug("delObject {}", key); }else{ logger.debug("delObject {} not exists", key); } } catch (Exception e) { logger.warn("delObject {}", key, e); } finally { returnResource(ShardedJedis); } return result; } /** * 缓存是否存在 * @param key 键 * @return */ public static boolean exists(String key) { boolean result = false; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.exists(key); logger.debug("exists {}", key); } catch (Exception e) { logger.warn("exists {}", key, e); } finally { returnResource(ShardedJedis); } return result; } /** * 缓存是否存在 * @param key 键 * @return */ public static boolean existsObject(String key) { boolean result = false; ShardedJedis ShardedJedis = null; try { ShardedJedis = getResource(); result = ShardedJedis.exists(getBytesKey(key)); logger.debug("existsObject {}", key); } catch (Exception e) { logger.warn("existsObject {}", key, e); } finally { returnResource(ShardedJedis); } return result; } /** * 获取资源 * @return * @throws JedisException */ public static ShardedJedis getResource() throws JedisException { ShardedJedis ShardedJedis = null; try { ShardedJedis = shardedJedisPool.getResource(); // logger.debug("getResource.", ShardedJedis); } catch (JedisException e) { logger.warn("getResource.", e); returnBrokenResource(ShardedJedis); throw e; } return ShardedJedis; } /** * 归还资源 * @param ShardedJedis * @param isBroken */ public static void returnBrokenResource(ShardedJedis ShardedJedis) { if (ShardedJedis != null) { shardedJedisPool.returnBrokenResource(ShardedJedis); } } /** * 释放资源 * @param ShardedJedis * @param isBroken */ public static void returnResource(ShardedJedis ShardedJedis) { if (ShardedJedis != null) { shardedJedisPool.returnResource(ShardedJedis); } } /** * 获取byte[]类型Key * @param key * @return */ public static byte[] getBytesKey(Object object){ if(object instanceof String){ return StringUtils.getBytes((String)object); }else{ return ObjectUtils.serialize(object); } } /** * Object转换byte[]类型 * @param key * @return */ public static byte[] toBytes(Object object){ return ObjectUtils.serialize(object); } /** * byte[]型转换Object * @param key * @return */ public static Object toObject(byte[] bytes){ return ObjectUtils.unserialize(bytes); } }