zoukankan      html  css  js  c++  java
  • [原创]通过切面与分布式锁实现合并相同请求

    在微服务里经常有并发相同的请求过来,当未命中缓存时,可能多条请求一起穿透缓存到DB,这就导致DB压力的增大

    本文使用redis的分布式锁来合并相同的请求

    当两个以上相同的请求来请求时, 通过竞争实现将相同的请求线性化.

    假设缓存失效,也只有1个线程去访问DB,其他线程在等待和重试 来降低缓存穿透的风险.

    代码如下

    1.添加依赖

       <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-autoconfigure</artifactId>
                <version>1.5.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>1.5.2.RELEASE</version>
            </dependency>
    
    <dependency>
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjrt</artifactId>
                <version>1.6.11</version>
            </dependency>
    
            <dependency>
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjweaver</artifactId>
                <version>1.6.11</version>
            </dependency>
            <!-- cglib -->
            <dependency>
                <groupId>cglib</groupId>
                <artifactId>cglib</artifactId>
                <version>2.1</version>
            </dependency>
    
            <dependency>
                <groupId>commons-configuration</groupId>
                <artifactId>commons-configuration</artifactId>
                <version>1.10</version>
            </dependency>
    
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>

    2.添加注解

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * 合并请求的注解
     * 仅支持单参数的接口方法
     */
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface MergeDuplicationRequestAttribute {
        /**
         * 分布式锁的key字符串 如 A:b:%s
         * @return
         */
        String redisLockKeyTemplate() default "";
    
        /**
         * 分布式锁的key组合项, 如id
         * 通过 String.format(redisLockKeyTemplate,getValues(object,fields)) 获取真实的分布式锁的key
         * @return
         */
        String[] redisLockKeyObjectFileds() default {};
    
        /**
         * 分布式锁的过期时间(毫秒)
         * @return
         */
        int expireMillseconds() default 1000;
    
        /**
         * 分布式锁的重试间隔(毫秒)
         * @return
         */
        int retryIntervalMillseconds() default 20;
    
        /**
         * 分布式锁的重试次数
         * @return
         */
        int retryTimes() default 3;
    }

    3.添加切面

    import com.g2.order.server.annotation.MergeDuplicationRequestAttribute;
    import com.g2.order.server.config.RedisLock;
    import com.g2.order.server.utils.ObjectUtils;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.EnableAspectJAutoProxy;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.Method;
    import java.util.Arrays;
    
    //开启AspectJ 自动代理模式,如果不填proxyTargetClass=true,默认为false,
    @EnableAspectJAutoProxy(proxyTargetClass = true)
    @Component
    @Order(-1)
    @Aspect
    public class MergeDuplicationRequestAspect {
        private static Logger logger = LoggerFactory.getLogger(MergeDuplicationRequestAspect.class);
    
        @Autowired
        private RedisLock redisLock;
    
        @Pointcut("@annotation(com.g2.order.server.annotation.MergeDuplicationRequestAttribute)")
        public void mergeDuplicationRequest() {
    
        }
    
        @Around("mergeDuplicationRequest()")
        public Object handleControllerMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
            //获取controller对应的方法.
            MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
    
            //获取方法
            Method method = methodSignature.getMethod();
    
            MergeDuplicationRequestAttribute annotation = method.getAnnotation(MergeDuplicationRequestAttribute.class);
            String key = annotation.redisLockKeyTemplate();
            String redisKey = key;
            Object[] args = proceedingJoinPoint.getArgs();
            if (args.length > 0) {
                Object param = args[0];
                String[] paramFields = annotation.redisLockKeyObjectFileds();
                if (paramFields.length > 0) {
                    String propertiesValue = ObjectUtils.getPropertiesValue(param, Arrays.asList(paramFields));
                    redisKey = String.format(key, propertiesValue);
                }
            }
    
            int retryIntervalMillseconds = annotation.retryIntervalMillseconds();
            int retryTimes = annotation.retryTimes();
            int expireMillseconds = annotation.expireMillseconds();
            if (!redisLock.lock(redisKey, expireMillseconds)) {
                for (int i = 1; i <= retryTimes; i++) {
                    try {
                        logger.info("有相同的请求,第{}次休眠",i);
                        Thread.sleep(retryIntervalMillseconds);
                    } catch (InterruptedException ex) {
    
                    }
    
                    if (redisLock.lock(redisKey, expireMillseconds)) {
                        break;
                    }
                }
            }
    
            return proceedingJoinPoint.proceed();
        }
    }

    3.添加分布式锁代码(使用redis集群)

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.util.StringUtils;
    
    import java.util.Arrays;
    import java.util.Set;
    import java.util.stream.Collectors;
    
    import lombok.extern.slf4j.Slf4j;
    import redis.clients.jedis.HostAndPort;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisPoolConfig;
    
    /**
     * 自定义Redis配置类
     *
     *  
     * @date 2017/10/19
     */
    @Slf4j
    @Configuration
    public class RedisConfig {
    
        /**
         * jedisCluster
         */
        @Bean
        @Autowired
        public JedisCluster jedisCluster(@Qualifier("jedis.pool.config") JedisPoolConfig config,
                @Value("${redis.host.address}") String hostAndPort,
                @Value("${redis.password}") String password) {
            log.info("开始初始化redis...");
            /**
             * 1 先检查redis集群是否已经配置
             */
            if (StringUtils.isEmpty(hostAndPort)) {
                throw new RuntimeException("Redis 集群初始化异常。请检查配置redis.host.address配置项");
            }
    
            /**
             * 2 根据配置构建hostAndPorts
             */
            Set<HostAndPort> hostAndPorts = Arrays.asList(hostAndPort.split(",")).stream().map(s -> {
                String[] split = s.split(":");
                return new HostAndPort(split[0], Integer.valueOf(split[1]));
            }).collect(Collectors.toSet());
    
            return new JedisCluster(hostAndPorts, 1000, 1000, 1, password, config);
        }
    
        @Bean(name = "jedis.pool.config")
        public JedisPoolConfig jedisPoolConfig(@Value("${jedis.pool.config.maxTotal}") int maxTotal,
                @Value("${jedis.pool.config.maxWaitMillis}") int maxWaitMillis,
                @Value("${jedis.pool.config.maxIdle}") int maxIdle) {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxTotal(maxTotal);
            config.setMaxIdle(maxIdle);
            config.setMaxWaitMillis(maxWaitMillis);
            return config;
        }
    }
    /**
     * 自定义Redis服务
     *
     * 
     * @date 2017/10/19
     */
    public interface RedisLock {
        String OK_CODE = "OK";
        String OK_MULTI_CODE = "+OK";
    
        /**
         * 加锁
         *
         * @param lockKey 锁key
         * @param millseconds 过期时间(毫秒)
         * @return true:成功获取锁;false:没有获取到锁
         */
        boolean lock(final String lockKey, final int millseconds);
    
        /**
         * 解锁
         *
         * @param key 锁key
         * @return true:成功解锁;
         */
        boolean unlock(String key);
    
        default boolean isStatusOk(String status) {
            return (status != null) && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status));
        }
    }
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import lombok.extern.slf4j.Slf4j;
    import redis.clients.jedis.JedisCluster;
    
    /**
     * 自定义Redis服务
     *
     * 
     * @date 2017/10/19
     */
    @Service
    @Slf4j
    public class DefaultRedisLock implements RedisLock {
    
        @Autowired
        private JedisCluster redisService;
    
        @Override
        public boolean lock(String lockKey, int millseconds) {
            return isStatusOk(redisService.set(lockKey, "1", "NX", "PX", millseconds));
        }
    
        @Override
        public boolean unlock(String lockKey) {
            return redisService.del(lockKey) == 1;
        }
    }

    4.添加辅助类

    import com.google.common.collect.Lists;
    import java.beans.IntrospectionException;
    import java.beans.PropertyDescriptor;
    import java.lang.reflect.Array;
    import java.lang.reflect.Field;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.util.List;
    import java.util.Map;
    import java.util.function.Function;
    import java.util.stream.Collectors;
    
    /**
     *  Object帮助类
     */
    public class ObjectUtils {
    
        public static Object getValue(Object object, String propertyName)
                throws IntrospectionException, IllegalAccessException, InvocationTargetException {
            Class aClass = object.getClass();
            if (isBaseClassOrString(aClass)) {
                return object.toString();
            }
    
            if (isArrayOrList(aClass)) {
                return object.toString();
            }
    
            if (isMap(aClass)) {
                return ((Map) object).getOrDefault(propertyName, "null");
            }
    
            Field[] fields = aClass.getDeclaredFields();
    
            for (Field f : fields) {
                if (!f.getName().equals(propertyName)) {
                    continue;
                }
    
                PropertyDescriptor descriptor = new PropertyDescriptor(f.getName(), aClass);
                Method readMethod = descriptor.getReadMethod();
                Object result = readMethod.invoke(object);
                return result;
            }
    
            return "null";
        }
    
        public static String getPropertiesValue(Object object, List<String> propertyNames)
                throws IntrospectionException, IllegalAccessException, InvocationTargetException {
            return getPropertiesValue(object, propertyNames, "_");
        }
    
        public static String getPropertiesValue(Object object, List<String> propertyNames, String delimiter)
                throws IntrospectionException, IllegalAccessException, InvocationTargetException {
    
            Function<List<Object>, String> joinFunction = list ->
                    list.stream().map(p -> p == null ? "null" : p.toString()).collect(Collectors.joining(delimiter));
    
            return getPropertiesValue(object, propertyNames, joinFunction);
        }
    
        public static String getPropertiesValue(Object object, List<String> propertyNames, Function<List<Object>, String>
                joinFunction)
                throws IntrospectionException, IllegalAccessException, InvocationTargetException {
            List<Object> objects = Lists.newArrayList();
            for (String p : propertyNames) {
                Object result = getValue(object, p);
                objects.add(result);
            }
    
            return joinFunction.apply(objects);
        }
    
        private static boolean isBaseClassOrString(Class aClass) {
            return (aClass == String.class)
                    || (aClass == Byte.class)
                    || (aClass == Short.class)
                    || (aClass == Integer.class)
                    || (aClass == Double.class)
                    || (aClass == Long.class)
                    || (aClass == Boolean.class)
                    || (aClass == Float.class)
                    || (aClass == java.lang.Character.class);
        }
    
        private static boolean isArrayOrList(Object obj) {
            return obj instanceof Array
                    || obj instanceof List;
        }
    
        private static boolean isMap(Object obj) {
            return obj instanceof Map;
        }
    }

    5.添加 启动代码及业务代码 和配置项

    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.web.servlet.ServletComponentScan;
    
    
    /**
     * 程序入口
     */
    @SpringBootApplication
    public class App {
        public static void main(String[] args) {
            SpringApplication.run(App.class, args);
        }
    }
    import com.g2.order.server.annotation.MergeDuplicationRequestAttribute;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import io.swagger.annotations.Api;
    
    @Api(value = "H5Controller", description = "H5接口")
    @RestController
    @RequestMapping("/h5")
    public class H5Controller {
        private static Logger logger = LoggerFactory.getLogger(H5Controller.class);
    
        @MergeDuplicationRequestAttribute(redisLockKeyTemplate = "A:%s", redisLockKeyObjectFileds = {"code"})
        @RequestMapping(value = "/{code}.jsonp",method = RequestMethod.GET)
        public Object testJsonp2(@PathVariable("code") String code) {
            try {
                Thread.sleep(1000);
            }catch (Exception ex){
    
            }
            return "234";
        }
    }
    server.port=88
    redis.host.address=127.0.0.1:6389,127.0.0.1:6479,127.0.0.1:6579
    redis.password=******
    jedis.pool.config.maxTotal=100
    jedis.pool.config.maxIdle=10
    jedis.pool.config.maxWaitMillis=100000

    6.测试结果如下:

    当快速刷新两个浏览器 访问相同的地址 http://127.0.0.1:88/h5/123.jsonp 时,产生如下日志

    2019-04-19 18:06:34.458  INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第1次休眠
    2019-04-19 18:06:34.479  INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第2次休眠
    2019-04-19 18:06:34.500  INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第3次休眠
    2019-04-19 18:06:56.061  INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第1次休眠
    2019-04-19 18:06:56.082  INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第2次休眠
    2019-04-19 18:06:56.103  INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第3次休眠

    7.改进方向

    7.1分布式锁可能存在线程1释放了线程2的锁.(后期修改成 CAS模式来解决问题)

    7.2在重试等待中,应当允许去访问另一个方法来获取缓存数据,如果成功立即中断返回.如果不成功才继续竞争锁

    伪代码可能如下:

    注解里增加 字段 getCacheFunction ="abc"

    在切面里 调用

    Object theInstance=proceedingJoinPoint.getTarget();

      Object cacheValueObj=MethodHelper.invoke(theInstance,getCacheFunction ,args)...

  • 相关阅读:
    js上拉加载下拉刷新
    CSRF
    Linux 常用命令
    汇编语言:了解寄存器与内存模型
    Node 的fs模块
    pdf转为html查看pdf.js
    centOs升级
    H5新特性监听手机的返回键
    gsap
    使用 iframe + postMessage 实现跨域通信
  • 原文地址:https://www.cnblogs.com/zhshlimi/p/10738299.html
Copyright © 2011-2022 走看看