zoukankan      html  css  js  c++  java
  • 也来说说redis+lua实现高并发限流

    我们的灵活用工系统调用优付渠道接口做用户签约或资金下发时,优付系统增加了API接口请求的限流策略。

    针对每一个商户的每种类型的接口请求做限流。比如:同一商户,每秒钟只允许20次签约请求。当每秒请求超过20次时,会提示“客户请求签约接口次数超限”。

    那么,作为下游系统,我们就要对并发进行控制,以防出现无效请求。

    最常用的并发限流方案是借助redis/jedis。为了保证原子性,这里,我使用Redis+LUA脚本的方式来控制。

    那么,

    对于服务提供方来说,当请求量超出设定的限流阈值,则直接返回错误码/错误提示,并终止对请求的处理。

    而对于调用方来说呢,我们要做的是:当并发请求超出了限定阈值时,要延迟请求,而不是直接丢弃。

    话不多说,上代码吧。

    如下RedisLimiter类,服务提供方使用limit方法实现限流,服务调用方使用limitWait方法实现限流等待(如需)。

    package jstudy.redislimit;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.script.DefaultRedisScript;
    import org.springframework.data.redis.core.script.RedisScript;
    import org.springframework.stereotype.Component;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Redis+Lua实现高并发限流
     */
    @Slf4j
    @Component
    public class RedisLimiter {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        /**
         * 达到限流时,则等待,直到新的间隔。
         *
         * @param key
         * @param limitCount
         * @param limitSecond
         */
        public void limitWait(String key, int limitCount, int limitSecond) {
            boolean ok;//放行标志
            do {
                ok = limit(key, limitCount, limitSecond);
                log.info("放行标志={}", ok);
                if (!ok) {
                    Long ttl = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
                    if (null != ttl && ttl > 0) {
                        try {
                            Thread.sleep(ttl);
                            log.info("sleeped:{}", ttl);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } while (!ok);
        }
    
        /**
         * 限流方法    true-放行;false-限流
         *
         * @param key
         * @param limitCount
         * @param limitSecond
         * @return
         */
        public boolean limit(String key, int limitCount, int limitSecond) {
            List<String> keys = Collections.singletonList(key);
            String luaScript = buildLuaScript();
            RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class);
            Number count = redisTemplate.execute(redisScript, keys, limitCount, limitSecond);
            log.info("Access try count is {} for key = {}", count, key);
            if (count != null && count.intValue() <= limitCount) {
                return true;//放行
            } else {
                return false;//限流
    //            throw new RuntimeException("You have been dragged into the blacklist");
            }
        }
    
        /**
         * 编写 redis Lua 限流脚本
         */
        public String buildLuaScript() {
            StringBuilder lua = new StringBuilder();
            lua.append("local c");
            lua.append("
    c = redis.call('get',KEYS[1])");
            // 调用不超过最大值,则直接返回
            lua.append("
    if c and tonumber(c) > tonumber(ARGV[1]) then");
            lua.append("
    return c;");
            lua.append("
    end");
            // 执行计算器自加
            lua.append("
    c = redis.call('incr',KEYS[1])");
            lua.append("
    if tonumber(c) == 1 then");
            // 从第一次调用开始限流,设置对应键值的过期
            lua.append("
    redis.call('expire',KEYS[1],ARGV[2])");
            lua.append("
    end");
            lua.append("
    return c;");
            return lua.toString();
        }
    
    }

    springboot自动注入的RedisTemplate是RedisTemplate<Object,Object>泛型, 上面class使用RedisTemplate<String, Object>,bean定义如下:

    package jstudy.redislimit;
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    @Configuration
    @EnableCaching // 开启缓存支持
    public class RedisConfig extends CachingConfigurerSupport {
    
        /**
         * RedisTemplate配置
         *
         * @param lettuceConnectionFactory
         * @return
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
            // 设置序列化
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
            om.enableDefaultTyping(DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            // 配置redisTemplate
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
            redisTemplate.setConnectionFactory(lettuceConnectionFactory);
            RedisSerializer<?> stringSerializer = new StringRedisSerializer();
            redisTemplate.setKeySerializer(stringSerializer);// key序列化
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
            redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
            redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
    
    }
    View Code

    并发测试通过,如下是testcase:

    package jstudy.redislimit;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class RedisLimiterTest {
        @Autowired
        private RedisLimiter redisLimiter;
    
        @Test
        public void testLimitWait() throws InterruptedException {
            ExecutorService pool = Executors.newCachedThreadPool();
            log.info("--------{}", redisTemplate.opsForValue().get("abc"));
            for (int j = 1; j <= 5; j++) {
                int i=j;
                pool.execute(() -> {
                    Thread.currentThread().setName( Thread.currentThread().getName().replace("-","_"));
                    redisLimiter.limitWait("abc", 3, 1);
                    log.info(i + ":" + true + " ttl:" + redisTemplate.getExpire("abc", TimeUnit.MILLISECONDS));
                        try {
                            // 线程等待,模拟执行业务逻辑
                            Thread.sleep(new Random().nextInt(100));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                });
            }
            pool.shutdown();
            pool.awaitTermination(2,TimeUnit.SECONDS);
        }
    }
    View Code

    【附1】

    jedis限流算法。不管是redis还是jedis,其实都是利用了消息的ttl(Time to Live),即,当消息达到ttl设定的值时,消息会自动过期。ttl还见诸于mq的死信队列,队列里的消息会延迟消费,当等待ttl指定的时间后,才会自动转移到

    如下jedis算法与上面lua脚本相比,实现算法殊途同归。不过,因为不具备原子性,高并发可能会出现冒的情况。所以,要实现精确限流,还是借助上面的lua。

    public class JedisLimiter {
        private static JedisPool jedisPool = SpringContextUtils.getBean(JedisPool.class);
    
        /**
         * 限制访问频率
         *
         * @param key
         * @param seconds
         * @param number
         * @return
         */
        public static boolean limit(String key, int seconds, int number) {
            Jedis jedis = null;
            try {
                jedis = getResource();
                String value = jedis.get(key);
                if (value == null) {
                    jedis.set(key, "1");
                    jedis.expire(key, seconds);
                    return false;
                } else {
                    Long ttl = jedis.ttl(key);
                    if (ttl.longValue() > 0) {
                        int parseInt = Integer.parseInt(value);
                        if (parseInt > number) {
                            return true;
                        }
                        jedis.incr(key);
                    }
                }
                return false;
            } catch (Exception e) {
                log.warn("checkReqNumber {}", e);
            } finally {
                returnResource(jedis);
            }
            return false;
        }
    }

    【附2】

    redis是使用RedisTemplate.expire来设置ttl;使用RedisTemplate.getExpire(key)或RedisTemplate.getExpire(key,TimeUnit)方法。当然,对于并发限流,我们需要后者来指定时间单位为TimeUnit.MILLISECONDS来得到精确的剩余毫秒数。

    jedis是使用Jedis.expire来设置ttl;使用Jedis.ttl(key)方法,返回的时间是毫秒。

    getExpire/ttl返回值:

    • -2:key不存在
    • -1:未设置ttl
    • n:实际的剩余ttl

    【附3】

    关于redis的increment :

    • 当key不存在时,创建key,默认值是delta(不设置delta的话,则为1)。
    • 当key存在时,按delta来递增。
  • 相关阅读:
    day24 Pyhton学习 反射
    正则表达式练习
    day23 Pyhton学习 昨日回顾.re模块.序列化模块
    day22 函数整理
    day22 Pyhton学习 re模块和正则表达式
    day21 Pyhton学习 模块
    函数整理
    一个关于浮点数运算需要注意的地方
    关于逻辑运算案例笔记
    数据的表现形式和进制之间的转换
  • 原文地址:https://www.cnblogs.com/buguge/p/13477151.html
Copyright © 2011-2022 走看看