package com.meiya.whale.mining.redis; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.reflection.MetaObject; import org.apache.ibatis.reflection.SystemMetaObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.*; import redis.clients.util.JedisClusterCRC16; import java.util.*; @Component public class JedisClusterUtil { @Autowired private JedisCluster jedisCluster; /** * Value批量查询 * * @param keys * @return */ public List<Object> batchGet(List<String> keys,String field) { List<Object> resList = new ArrayList<>(); if (keys == null || keys.size() == 0) { return resList; } if (keys.size() == 1) { Map<String, String> map = jedisCluster.hgetAll(keys.get(0)); resList.add(map); return resList; } Map<JedisPool, List<String>> jedisPoolMap = getJedisPool(keys); List<String> keyList; JedisPool currentJedisPool = null; Pipeline currentPipeline; List<Object> res = new ArrayList<>(); Map<String, Object> resultMap = new HashMap<>(); for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { Jedis jedis = null; try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : keyList) { if(StringUtils.isNotBlank(field)){ currentPipeline.hget(key,field); }else { currentPipeline.hgetAll(key); } } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for (int i = 0; i < keyList.size(); i++) { if (null == res.get(i)) { resultMap.put(keyList.get(i), null); } else { resultMap.put(keyList.get(i), res.get(i)); } } } catch (Exception e) { e.printStackTrace(); } finally { returnResource(jedis, currentJedisPool); } } resList = sortList(keys, resultMap); return resList; } private Map<JedisPool, List<String>> getJedisPool(List<String> keys) { //JedisCluster继承了BinaryJedisCluster //BinaryJedisCluster的JedisClusterConnectionHandler属性 //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache //从而获取slot和JedisPool直接的映射 MetaObject metaObject = SystemMetaObject.forObject(jedisCluster); JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); //保存地址+端口和命令的映射 Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>(); JedisPool currentJedisPool = null; List<String> keyList; for (String key : keys) { //计算哈希槽 int crc = JedisClusterCRC16.getSlot(key); //通过哈希槽获取节点的连接 currentJedisPool = cache.getSlotPool(crc); //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的 //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样 //的,可以作为map的key if (jedisPoolMap.containsKey(currentJedisPool)) { jedisPoolMap.get(currentJedisPool).add(key); } else { keyList = new ArrayList<>(); keyList.add(key); jedisPoolMap.put(currentJedisPool, keyList); } } return jedisPoolMap; } private List<Object> sortList(List<String> keys, Map<String, Object> params) { List<Object> resultList = new ArrayList<>(); Iterator<String> it = keys.iterator(); while (it.hasNext()) { String key = it.next(); resultList.add(params.get(key)); } return resultList; } /** * 释放jedis资源 * * @param jedis */ public void returnResource(Jedis jedis, JedisPool jedisPool) { if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } }