zoukankan      html  css  js  c++  java
  • Redis连接动态化

    连接工厂创建连接,并放在连接池中?

    一、Spring RedisTemplate的原理

      1、不同类型数据结构定义了不同的操作器

       private final ValueOperations<K, V> valueOps = new DefaultValueOperations<>(this);
        private final ListOperations<K, V> listOps = new DefaultListOperations<>(this);
        private final SetOperations<K, V> setOps = new DefaultSetOperations<>(this);
        private final StreamOperations<K, ?, ?> streamOps = new DefaultStreamOperations<>(this,ObjectHashMapper.getSharedInstance());
        private final ZSetOperations<K, V> zSetOps = new DefaultZSetOperations<>(this);
        private final GeoOperations<K, V> geoOps = new DefaultGeoOperations<>(this);
        private final HyperLogLogOperations<K, V> hllOps = new DefaultHyperLogLogOperations<>(this);
        private final ClusterOperations<K, V> clusterOps = new DefaultClusterOperations<>(this);
    opsForValue() 操作只有简单属性的数据
    opsForList() 操作含有list的数据
    opsForSet() 操作含有set的数据
    opsForZSet() 操作含有ZSet(有序集合)的数据
    opsForHash() 操作含有hash的数据
    opsForStream() 操作Stream

      2、提供配置key和value的序列化方式,默认的序列化方式为JDK,可在实例化RedisTemplate的时候指定序列化方式

        private @Nullable RedisSerializer<?> defaultSerializer;
    
        private RedisSerializer keySerializer = null;
        private RedisSerializer valueSerializer = null;
        private RedisSerializer hashKeySerializer = null;
        private RedisSerializer hashValueSerializer = null;
        private RedisSerializer<String> stringSerializer = RedisSerializer.string();

      3、RedisTemplate继承自 RedisAccessor,RedisAccessor中包含了 RedisConnectFactory属性

      因此,在实例化RedisTemplate可指定自定义的连接工厂  

    public class RedisAccessor implements InitializingBean {
        private @Nullable RedisConnectionFactory connectionFactory;
    }
      4、提供实际的数据操作的执行
        public <T> T execute(RedisCallback<T> action, boolean exposeConnection) {
            return execute(action, exposeConnection, false);
        }

      操作数据流程:

      1、获取连接工厂

      RedisConnectionFactory factory = getRequiredConnectionFactory();

      2、根据连接工厂获取连接

      RedisConnection conn = RedisConnectionUtils.getConnection(factory, enableTransactionSupport);

        1)当前线程是否持有连接,若持有,则直接返回

        2)否则,使用连接工厂创建一个连接并绑定到当前线程上

      3、执行完成释放连接

         finally {
                RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
            }

    二、连接动态化

      1、线程本地变量

    public class RdmContext {
    
        /**
         * 保存RdmContext的线程本地变量
         */
        private static final ThreadLocal<RdmContext> RDM_CONTEXT_THREAD_LOCAL = ThreadLocal.withInitial(RdmContext::new);
    
        public static RdmContext currentContext() {
            return RDM_CONTEXT_THREAD_LOCAL.get();
        }
    
        public static void remove() {
            RDM_CONTEXT_THREAD_LOCAL.remove();
        }
    
        /**
         * redis操作模板对象
         */
        private RedisTemplate<String, Object> redisTemplate;
    
        /**
         * 请求参数
         */
        private Map<String, String> data;
    
        /**
         * 连接信息
         */
        private RedisProperties redisProperties;
    
        /**
         * 连接信息token
         */
        private String token;
    
        /**
         * 处理器执行结果
         */
        private HandleResult handleResult = new HandleResult();
    
        /**
         * 集群模式
         */
        private String mode;
    
        ...getter/setter
    
    }

      2、HandleResult

    import com.fasterxml.jackson.annotation.JsonIgnore;
    import com.fasterxml.jackson.annotation.JsonInclude;
    
    public class HandleResult {
    
        private Integer code = 0;
    
        private String message = "success";
    
        @JsonInclude(JsonInclude.Include.NON_EMPTY)
        private Object data;
    
        public HandleResult() {
        }
    
        public HandleResult(ResponseEnum response) {
            this.code = response.getCode();
            this.message = response.getMessage();
        }
    
        public HandleResult(Object data) {
            this.data = data;
        }
    
        public HandleResult fail(ResponseEnum response) {
            this.code = response.getCode();
            this.message = response.getMessage();
            return this;
        }
    
        @JsonIgnore
        public boolean isSuccess() {
            return ResponseEnum.SUCCESS.getCode().equals(code);
        }
    
      ...getter/setter  
    }

      3、定义前置处理器,在每个请求到大Controller之前先执行前置处理器

        1)定义前置处理器接口

    /**
     * 处理器接口
     */
    public interface RdmHandler {
    
        /**
         * 是否执行
         *
         * @return true:执行,false:不执行
         */
        boolean boolHandle();
    
        /**
         * 执行,并处理异常
         */
        void handle();
    }

        2)定义参数解析处理器

          用于解析请求中的Redis连接信息参数和其他参数,并将请求参数保存在当前线程的ThreadLoca中

    /**
     * 参数处理器
     */
    @Handler(order = 2)
    public class ParamParseHandler implements RdmHandler {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ParamParseHandler.class);
    
        @Override
        public boolean boolHandle() {
            return true;
        }
    
        @Override
        public void handle() {
            RdmContext rdmContext = RdmContext.currentContext();
            try {
                // 将data从base64转为json明文
                String originData = RequestUtil.getParam("data");
                String data = Base64Util.decode(originData);
                // 解析其中的连接信息
                Map<String, String> dataMap = JacksonUtil.ofMap(data, String.class, String.class);
                if (dataMap == null) {
                    rdmContext.getHandleResult().fail(ResponseEnum.PARAM_ERROR);
                    return;
                }
                rdmContext.setData(dataMap);
                // 单机还是集群
                String mode = dataMap.get("mode");
                String host = dataMap.get("host");
                String password = dataMap.get("password");
                String username = dataMap.get("username");
                String database = dataMap.get("database");
    
                rdmContext.setMode(mode);
                RedisProperties redisProperties = new RedisProperties();
                if (StringUtil.equals(CommonConstant.STANDALONE, mode)) {
                    // 解析域名端口
                    String[] hostPort = host.split(":");
                    redisProperties.setHost(hostPort[0]);
                    redisProperties.setPort(Integer.parseInt(hostPort[1]));
                    // 如果数据库不为空
                    if (StringUtil.isNotEmpty(database)) {
                        redisProperties.setDatabase(Integer.parseInt(database));
                    }
                } else if (StringUtil.equals(CommonConstant.CLUSTER, mode)) {
                    String[] hosts = host.split(",");
                    redisProperties.setNodes(Arrays.asList(hosts));
                } else {
                    rdmContext.getHandleResult().fail(ResponseEnum.PARAM_ERROR);
                    return;
                }
                redisProperties.setUsername(username);
                redisProperties.setPassword(password);
                rdmContext.setRedisProperties(redisProperties);
                String token = Md5Util.getMd5(JacksonUtil.toString(redisProperties));
                rdmContext.setToken(token);
                // 放入延时队列中,半小时后清理
                RedisDelayQueue.putDelayQueue(token);
            } catch (Exception e) {
                LOGGER.error("参数处理失败:" + e.getMessage(), e);
                rdmContext.getHandleResult().fail(ResponseEnum.PARAM_ERROR);
            }
        }
    
    }
    View Code

        3)定义单机模式处理器

          实例化并初始化RedisTemplate,将其对象保存在ThreadLocal中

    /**
     * Redis单机处理器
     */
    @Handler(order = 3)
    public class RedisStandaloneHandler implements RdmHandler {
    
        @Override
        public boolean boolHandle() {
            return StringUtil.equals(CommonConstant.STANDALONE,RdmContext.currentContext().getMode());
        }
    
        @Override
        public void handle() {
            RdmContext rdmContext = RdmContext.currentContext();
            RedisProperties redisProperties = rdmContext.getRedisProperties();
            String token = rdmContext.getToken();
            // 看是否有缓存的redisTemplate
            RedisTemplate<String, Object> redisTemplate = RdmCache.REDIS_TEMPLATE_MAP.get(token);
            if (redisTemplate != null) {
                rdmContext.setRedisTemplate(redisTemplate);
                return;
            }
            // 单机服务配置
            RedisStandaloneConfiguration serverConfig = new RedisStandaloneConfiguration();
            // 主机、用户名、密码、端口、数据库
            serverConfig.setHostName(redisProperties.getHost());
            serverConfig.setPassword(redisProperties.getPassword());
            serverConfig.setPort(redisProperties.getPort());
            serverConfig.setDatabase(redisProperties.getDatabase());
            serverConfig.setUsername(redisProperties.getUsername());
            // 连接池信息
            GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
            poolConfig.setMaxIdle(redisProperties.getPool().getMaxIdle());
            poolConfig.setMinIdle(redisProperties.getPool().getMinIdle());
            poolConfig.setMaxTotal(redisProperties.getPool().getMaxActive());
            poolConfig.setMaxWaitMillis(redisProperties.getPool().getMaxWait());
    
            // 连接池客户端配置建造者
            LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder lettucePoolClientConfBuilder = LettucePoolingClientConfiguration.builder()
                    .commandTimeout(Duration.ofMillis(redisProperties.getTimeout()));
            // 连接池
            lettucePoolClientConfBuilder.poolConfig(poolConfig);
    
            // 客户端配置
            LettuceClientConfiguration clientConfig = lettucePoolClientConfBuilder.build();
    
            LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(serverConfig, clientConfig);
            lettuceConnectionFactory.afterPropertiesSet();
    
            redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(lettuceConnectionFactory);
            // 序列化方式全部为String
            redisTemplate.setKeySerializer(RedisSerializer.string());
            redisTemplate.setValueSerializer(new RdmStringRedisSerializer());
            redisTemplate.setHashKeySerializer(RedisSerializer.string());
            redisTemplate.setHashValueSerializer(RedisSerializer.json());
    
            redisTemplate.afterPropertiesSet();
    
            rdmContext.setRedisTemplate(redisTemplate);
            // 缓存起来
            RdmCache.REDIS_TEMPLATE_MAP.put(token, redisTemplate);
        }
    }
    View Code

        4)定义集群模式处理器

          实例化并初始化RedisTemplate,将其对象保存在ThreadLocal中

    /**
     * Redis集群处理器
     */
    @Handler(order = 4)
    public class RedisClusterHandler implements RdmHandler {
    
        @Override
        public boolean boolHandle() {
            return StringUtil.equals(CommonConstant.CLUSTER, RdmContext.currentContext().getMode());
        }
    
        @Override
        public void handle() {
            // 当前线程本地变量
            RdmContext rdmContext = RdmContext.currentContext();
            RedisProperties redisProperties = rdmContext.getRedisProperties();
            String token = rdmContext.getToken();
            // 看是否有缓存的redisTemplate
            RedisTemplate<String, Object> redisTemplate = RdmCache.REDIS_TEMPLATE_MAP.get(token);
            if (redisTemplate != null) {
                rdmContext.setRedisTemplate(redisTemplate);
                return;
            }
            // 集群服务配置
            // 集群redis连接工厂配置
            RedisClusterConfiguration serverConfig = new RedisClusterConfiguration(redisProperties.getNodes());
            serverConfig.setPassword(redisProperties.getPassword());
            // 用户名、密码
            serverConfig.setUsername(redisProperties.getUsername());
            serverConfig.setPassword(redisProperties.getPassword());
            // 跨集群执行命令最大重定向次数
            serverConfig.setMaxRedirects(redisProperties.getMaxRedirects());
            // 连接池信息
            GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
            poolConfig.setMaxIdle(redisProperties.getPool().getMaxIdle());
            poolConfig.setMinIdle(redisProperties.getPool().getMinIdle());
            poolConfig.setMaxTotal(redisProperties.getPool().getMaxActive());
            poolConfig.setMaxWaitMillis(redisProperties.getPool().getMaxWait());
    
            // 连接池客户端配置建造者
            LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder lettucePoolClientConfBuilder = LettucePoolingClientConfiguration.builder()
                    .commandTimeout(Duration.ofMillis(redisProperties.getTimeout()));
            // 连接池
            lettucePoolClientConfBuilder.poolConfig(poolConfig);
    
            // 客户端配置
            LettuceClientConfiguration clientConfig = lettucePoolClientConfBuilder.build();
    
            LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(serverConfig, clientConfig);
            lettuceConnectionFactory.afterPropertiesSet();
    
            redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(lettuceConnectionFactory);
            // 序列化方式全部为String
            redisTemplate.setKeySerializer(RedisSerializer.string());
            redisTemplate.setValueSerializer(new RdmStringRedisSerializer());
            redisTemplate.setHashKeySerializer(RedisSerializer.string());
            redisTemplate.setHashValueSerializer(RedisSerializer.json());
    
            redisTemplate.afterPropertiesSet();
    
            rdmContext.setRedisTemplate(redisTemplate);
            // 缓存起来
            RdmCache.REDIS_TEMPLATE_MAP.put(token, redisTemplate);
        }
    }
    View Code

        5)注册处理器

    /**
     * 处理器注册器及执行器
     */
    @Component
    public class HandlerRunner implements ApplicationContextAware {
    
        private static final List<RdmHandler> HANDLERS = new ArrayList<>();
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            // 加载并注册处理器
            Map<String, RdmHandler> handlerMap = applicationContext.getBeansOfType(RdmHandler.class);
            for (Map.Entry<String, RdmHandler> entry : handlerMap.entrySet()) {
                // 注册处理器
                HANDLERS.add(entry.getValue());
            }
            // 排序
            HANDLERS.sort(Comparator.comparingInt(o -> o.getClass().getAnnotation(Handler.class).order()));
        }
    
        public static HandleResult run() {
            // 从上下文中获取处理器的执行结果(引用)
            HandleResult handleResult = RdmContext.currentContext().getHandleResult();
            // 遍历处理器,如果需要执行并执行
            for (RdmHandler handler : HANDLERS) {
                if (handler.boolHandle()) {
                    handler.handle();
                }
                // 如果执行失败,终止执行
                if (!handleResult.isSuccess()) {
                    break;
                }
            }
            // 返回执行结果
            return handleResult;
        }
    }
    View Code

        6)AOP拦截所有的请求,并在调用controller之前调用所有前置处理器

    @Aspect
    @Component
    public class RdmAspect {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RdmAspect.class);
    
        /**
         * 切点
         */
        @Pointcut(value = "@annotation(org.springframework.web.bind.annotation.RequestMapping)||@annotation(org.springframework.web.bind.annotation.PostMapping)||@annotation(org.springframework.web.bind.annotation.GetMapping)")
        private void webPointcut() {
            // doNothing
        }
    
        @Around(value = "webPointcut()")
        public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
            // 执行开始的时间
            long beginTime = System.currentTimeMillis();
            // 设置MDC参数用于日志打印
            MDCUtil.init();
            // 拦截方法执行结果
            Object result;
            try {
                // 执行前置处理器
                HandleResult handeResult = HandlerRunner.run();
                if (!handeResult.isSuccess()) {
                    return handeResult;
                }
                // 执行拦截的方法
                result = joinPoint.proceed();
            } catch (RedisConnectionFailureException rcfe) {
                LOGGER.error(rcfe.getMessage(), rcfe);
                return new HandleResult(ResponseEnum.CONNECT_FAIL);
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
                return new HandleResult(ResponseEnum.UNKNOWN_ERROR);
            } finally {
                long endTime = System.currentTimeMillis();
                LOGGER.info("耗时{}毫秒", endTime - beginTime);
                // 清除MDC
                MDCUtil.remove();
                // 清除共享副本
                RdmContext.remove();
            }
            return result;
        }
    
    }
    View Code

    END.

  • 相关阅读:
    js广告随窗口滚动小案例
    CSS中Float概念相关文章
    图片展示javascript各类型的关系
    ajax调用异常
    设置修改 Apache 文件根目录 (Document Root)
    SQL查询今天与昨天的记录,及本月记录、本周记录
    服务器角色
    javaScript进阶昂贵的集合
    .net实体新解
    数组操作时避免空值出现
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/15237028.html
Copyright © 2011-2022 走看看