package com.meiya.whale.mining.redis;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.util.*;
@Component
public class JedisClusterUtil {
@Autowired
private JedisCluster jedisCluster;
/**
* Value批量查询
*
* @param keys
* @return
*/
public List<Object> batchGet(List<String> keys,String field) {
List<Object> resList = new ArrayList<>();
if (keys == null || keys.size() == 0) {
return resList;
}
if (keys.size() == 1) {
Map<String, String> map = jedisCluster.hgetAll(keys.get(0));
resList.add(map);
return resList;
}
Map<JedisPool, List<String>> jedisPoolMap = getJedisPool(keys);
List<String> keyList;
JedisPool currentJedisPool = null;
Pipeline currentPipeline;
List<Object> res = new ArrayList<>();
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {
Jedis jedis = null;
try {
currentJedisPool = entry.getKey();
keyList = entry.getValue();
//获取pipeline
jedis = currentJedisPool.getResource();
currentPipeline = jedis.pipelined();
for (String key : keyList) {
if(StringUtils.isNotBlank(field)){
currentPipeline.hget(key,field);
}else {
currentPipeline.hgetAll(key);
}
}
//从pipeline中获取结果
res = currentPipeline.syncAndReturnAll();
currentPipeline.close();
for (int i = 0; i < keyList.size(); i++) {
if (null == res.get(i)) {
resultMap.put(keyList.get(i), null);
} else {
resultMap.put(keyList.get(i), res.get(i));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
returnResource(jedis, currentJedisPool);
}
}
resList = sortList(keys, resultMap);
return resList;
}
private Map<JedisPool, List<String>> getJedisPool(List<String> keys) {
//JedisCluster继承了BinaryJedisCluster
//BinaryJedisCluster的JedisClusterConnectionHandler属性
//里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache
//从而获取slot和JedisPool直接的映射
MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
//保存地址+端口和命令的映射
Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();
JedisPool currentJedisPool = null;
List<String> keyList;
for (String key : keys) {
//计算哈希槽
int crc = JedisClusterCRC16.getSlot(key);
//通过哈希槽获取节点的连接
currentJedisPool = cache.getSlotPool(crc);
//由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的
//JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样
//的,可以作为map的key
if (jedisPoolMap.containsKey(currentJedisPool)) {
jedisPoolMap.get(currentJedisPool).add(key);
} else {
keyList = new ArrayList<>();
keyList.add(key);
jedisPoolMap.put(currentJedisPool, keyList);
}
}
return jedisPoolMap;
}
private List<Object> sortList(List<String> keys, Map<String, Object> params) {
List<Object> resultList = new ArrayList<>();
Iterator<String> it = keys.iterator();
while (it.hasNext()) {
String key = it.next();
resultList.add(params.get(key));
}
return resultList;
}
/**
* 释放jedis资源
*
* @param jedis
*/
public void returnResource(Jedis jedis, JedisPool jedisPool) {
if (jedis != null && jedisPool != null) {
jedisPool.returnResource(jedis);
}
}
}