zoukankan      html  css  js  c++  java
  • redis管道pipeline

         Jedis jedis = new Jedis("127.0.0.1",6379);
            Pipeline pipeline = jedis.pipelined();
            for(int i = 0;i<1000;i++){
                String content = i + "";
                pipeline.set(content,content);
            }
            pipeline.sync();
    

      

    Jedis jedis = new Jedis("127.0.0.1",6379);
    Pipeline pipeline = jedis.pipelined();
    Map<String, Response> responses = new LinkedHashMap<String, Response>();
            for(int i = 0;i<1000;i++){
                String content = i + "";
                Response<String> response = pipeline.get(content);
                responses.put(content,response);
            }
            pipeline.sync();
            for(String key:responses.keySet()){
                System.out.println("key:"+key + ",value:" + responses.get(key).get());
            }
    

      二、集群

    1、方式一

    import org.apache.ibatis.reflection.MetaObject;
    import org.apache.ibatis.reflection.SystemMetaObject;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisClusterInfoCache;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.Pipeline;
    import redis.clients.util.JedisClusterCRC16;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @Auther: lyl
     * @Date: 2019/10/23 17:06
     * @Description:
     */
    public class BatchUtil {
        public static Map<String, String> mget(JedisCluster jc, String... keys){
            Map<String, String> resMap = new HashMap<>();
            if(keys == null || keys.length == 0){
                return resMap;
            }
            //如果只有一条,直接使用get即可
            if(keys.length == 1){
                resMap.put(keys[0], jc.get(keys[0]));
                return resMap;
            }
    
            //JedisCluster继承了BinaryJedisCluster
            //BinaryJedisCluster的JedisClusterConnectionHandler属性
            //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache
            //从而获取slot和JedisPool直接的映射
            MetaObject metaObject = SystemMetaObject.forObject(jc);
            JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
            //保存地址+端口和命令的映射
            Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();
            List<String> keyList = null;
            JedisPool currentJedisPool = null;
            Pipeline currentPipeline = null;
    
            for(String key : keys){
                //计算哈希槽
                int crc = JedisClusterCRC16.getSlot(key);
                //通过哈希槽获取节点的连接
                currentJedisPool = cache.getSlotPool(crc);
                //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的
                //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样
                //的,可以作为map的key
                if(jedisPoolMap.containsKey(currentJedisPool)){
                    jedisPoolMap.get(currentJedisPool).add(key);
                }else{
                    keyList = new ArrayList<>();
                    keyList.add(key);
                    jedisPoolMap.put(currentJedisPool, keyList);
                }
            }
    
            //保存结果
            List<Object> res = new ArrayList<>();
            //执行
            for(Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()){
                try {
                    currentJedisPool = entry.getKey();
                    keyList = entry.getValue();
                    //获取pipeline
                    currentPipeline = currentJedisPool.getResource().pipelined();
                    for(String key : keyList){
                        currentPipeline.get(key);
                    }
                    //从pipeline中获取结果
                    res = currentPipeline.syncAndReturnAll();
                    currentPipeline.close();
                    for(int i=0; i<keyList.size(); i++){
                        resMap.put(keyList.get(i), res.get(i)==null ? null : res.get(i).toString());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return new HashMap<>();
                }
            }
            return resMap;
        }
    }
    

      2、方式二

    import java.io.Closeable;
    import java.io.IOException;
    import java.lang.reflect.Field;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    import java.util.Queue;
    import java.util.Set;
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import redis.clients.jedis.BinaryJedisCluster;
    import redis.clients.jedis.Client;
    import redis.clients.jedis.HostAndPort;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisClusterConnectionHandler;
    import redis.clients.jedis.JedisClusterInfoCache;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisSlotBasedConnectionHandler;
    import redis.clients.jedis.PipelineBase;
    import redis.clients.jedis.exceptions.JedisMovedDataException;
    import redis.clients.jedis.exceptions.JedisRedirectionException;
    import redis.clients.util.JedisClusterCRC16;
    import redis.clients.util.SafeEncoder;
    /**
     * @Auther: lyl
     * @Date: 2019/10/23 16:12
     * @Description:
     */
    public class JedisClusterPipeline extends PipelineBase implements Closeable {
        private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);
    
        // 部分字段没有对应的获取方法,只能采用反射来做
        // 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口
        private static final Field FIELD_CONNECTION_HANDLER;
        private static final Field FIELD_CACHE;
        static {
            FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
            FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
        }
    
        private JedisSlotBasedConnectionHandler connectionHandler;
        private JedisClusterInfoCache clusterInfoCache;
        private Queue<Client> clients = new LinkedList<Client>();   // 根据顺序存储每个命令对应的Client
        private Map<JedisPool, Jedis> jedisMap = new HashMap<>();   // 用于缓存连接
        private boolean hasDataInBuf = false;   // 是否有数据在缓存区
    
        /**
         * 根据jedisCluster实例生成对应的JedisClusterPipeline
         * @param
         * @return
         */
        public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
            JedisClusterPipeline pipeline = new JedisClusterPipeline();
            pipeline.setJedisCluster(jedisCluster);
            return pipeline;
        }
    
        public JedisClusterPipeline() {
        }
    
        public void setJedisCluster(JedisCluster jedis) {
            connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
            clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
        }
    
        /**
         * 刷新集群信息,当集群信息发生变更时调用
         * @param
         * @return
         */
        public void refreshCluster() {
            connectionHandler.renewSlotCache();
        }
    
        /**
         * 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
         */
        public void sync() {
            innerSync(null);
        }
    
        /**
         * 同步读取所有数据 并按命令顺序返回一个列表
         *
         * @return 按照命令的顺序返回所有的数据
         */
        public List<Object> syncAndReturnAll() {
            List<Object> responseList = new ArrayList<Object>();
    
            innerSync(responseList);
    
            return responseList;
        }
    
        private void innerSync(List<Object> formatted) {
            HashSet<Client> clientSet = new HashSet<Client>();
    
            try {
                for (Client client : clients) {
                    // 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。
                    // 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了
                    Object data = generateResponse(client.getOne()).get();
                    if (null != formatted) {
                        formatted.add(data);
                    }
    
                    // size相同说明所有的client都已经添加,就不用再调用add方法了
                    if (clientSet.size() != jedisMap.size()) {
                        clientSet.add(client);
                    }
                }
            } catch (JedisRedirectionException jre) {
                if (jre instanceof JedisMovedDataException) {
                    // if MOVED redirection occurred, rebuilds cluster's slot cache,
                    // recommended by Redis cluster specification
                    refreshCluster();
                }
    
                throw jre;
            } finally {
                if (clientSet.size() != jedisMap.size()) {
                    // 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
                    for (Jedis jedis : jedisMap.values()) {
                        if (clientSet.contains(jedis.getClient())) {
                            continue;
                        }
    
                        flushCachedData(jedis);
                    }
                }
    
                hasDataInBuf = false;
                close();
            }
        }
    
        @Override
        public void close() {
            clean();
    
            clients.clear();
    
            for (Jedis jedis : jedisMap.values()) {
                if (hasDataInBuf) {
                    flushCachedData(jedis);
                }
    
                jedis.close();
            }
    
            jedisMap.clear();
    
            hasDataInBuf = false;
        }
    
        private void flushCachedData(Jedis jedis) {
            try {
                jedis.getClient().getAll();
            } catch (RuntimeException ex) {
            }
        }
    
        @Override
        protected Client getClient(String key) {
            byte[] bKey = SafeEncoder.encode(key);
    
            return getClient(bKey);
        }
    
        @Override
        protected Client getClient(byte[] key) {
            Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
    
            Client client = jedis.getClient();
            clients.add(client);
    
            return client;
        }
    
        private Jedis getJedis(int slot) {
            JedisPool pool = clusterInfoCache.getSlotPool(slot);
    
            // 根据pool从缓存中获取Jedis
            Jedis jedis = jedisMap.get(pool);
            if (null == jedis) {
                jedis = pool.getResource();
                jedisMap.put(pool, jedis);
            }
    
            hasDataInBuf = true;
            return jedis;
        }
    
        private static Field getField(Class<?> cls, String fieldName) {
            try {
                Field field = cls.getDeclaredField(fieldName);
                field.setAccessible(true);
    
                return field;
            } catch (NoSuchFieldException | SecurityException e) {
                throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
            }
        }
    
        @SuppressWarnings({"unchecked" })
        private static <T> T getValue(Object obj, Field field) {
            try {
                return (T)field.get(obj);
            } catch (IllegalArgumentException | IllegalAccessException e) {
                LOGGER.error("get value fail", e);
    
                throw new RuntimeException(e);
            }
        }
    
        public static void main(String[] args) throws IOException {
            Set<HostAndPort> nodes = new HashSet<HostAndPort>();
            nodes.add(new HostAndPort("127.0.0.1", 7000));
            nodes.add(new HostAndPort("127.0.0.1", 7001));
            nodes.add(new HostAndPort("127.0.0.1", 7002));
            JedisCluster jc = new JedisCluster(nodes);
    
            long s = System.currentTimeMillis();
    
            JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jc);
            jcp.refreshCluster();
            List<Object> batchResult = null;
            try {
                // batch write
                for (int i = 0; i < 100; i++) {
                    jcp.set("k" + i, "v1" + i);
                }
                jcp.sync();
    
                // batch read
                for (int i = 0; i < 10000; i++) {
                    jcp.get("k" + i);
                }
                batchResult = jcp.syncAndReturnAll();
            } finally {
                jcp.close();
            }
    
            // output time
            long t = System.currentTimeMillis() - s;
            System.out.println(t);
    
            System.out.println(batchResult.size());
    
            // 实际业务代码中,close要在finally中调,这里之所以没这么写,是因为懒
            jc.close();
        }
    }
    

      

  • 相关阅读:
    Fire
    Apple Tree
    访问艺术馆
    三角关系
    字母表
    折纸
    旅行
    单词分类

    圆桌游戏
  • 原文地址:https://www.cnblogs.com/liaoyanglong/p/11711604.html
Copyright © 2011-2022 走看看