zoukankan      html  css  js  c++  java
  • redis-缓存设计-信号量设计

    非公平信号量

    说明

    1.通过zset add 和rank来实现是否获取信号量的判断,

    2.add时通过当前时间+超时时间 计算的时间设置为score 每次add提前删除过期的0~当前时间

    信号量类封装

        public static class RedisSemaphore {
            //线程缓存保存index 用于释放
            ThreadLocal<String> semaphoreValue = new ThreadLocal<>();
            private Integer limit;
    
            public RedisSemaphore( Integer limit) {
                //因为redis rank从0开始 所以 limit-1
                this.limit = limit-1;
            }
    
            /**
             * 信号量
             * @param timeout
             * @return
             */
            public boolean acquire( Jedis conn,Integer timeout) {
                String index = UUID.randomUUID().toString();
                //计算过期时间
                Calendar c = Calendar.getInstance();
                c.setTime(new Date());
                c.add(Calendar.SECOND, timeout);
                conn.zadd("semaphore:acquire", c.getTime().getTime(), index);
                //删除过期的
                conn.zremrangeByScore("semaphore:acquire", 0, System.currentTimeMillis());
                //判断是否获得信号量 根据获得的排名来
                Long rank = conn.zrank("semaphore:acquire", index);
                if (rank > limit) {
                    //删除
                    conn.zrem("semaphore:acquire", index);
                    return false;
                }
                //线程缓存保存用于释放
                semaphoreValue.set(index);
                return true;
    
            }
    
            public void release( Jedis conn) {
                String index = semaphoreValue.get();
                semaphoreValue.remove();;
                if (index == null) {
                    return;
                }
                conn.zrem("semaphore:acquire", index);
    
            }
        }

    测试类

    public static void main(String[] args)
                throws Exception {
            Jedis conn = new Jedis("127.0.0.1", 6379);
            conn.flushDB();
            RedisSemaphore redisSemaphore=new RedisSemaphore(11);
            CountDownLatch countDownLatch=new CountDownLatch(11);
            //=====================获取并释放===================
            System.out.println("==================多线程获取并释放信号量结果==================");
            for(int i=0;i<11;i++){
                final int x=i;
                new Thread(new Runnable() {
                    int j=x;
                    @Override
                    public void run() {
                        Jedis conn = new Jedis("127.0.0.1", 6379);
    
                        System.out.println("获取释放,i"+j+","+redisSemaphore.acquire(conn,30));
                        redisSemaphore.release(conn);
                        countDownLatch.countDown();
                    }
                }).start();
    
            }
            countDownLatch.await();
            //=====================不释放===================
            System.out.println("==================多线程不释放获取信号量结果 下面正常获取表示有上面有正常释放==================");
    
            for(int i=0;i<12;i++){
                final int x=i;
                new Thread(new Runnable() {
                    int j=x;
                    @Override
                    public void run() {
                        Jedis conn = new Jedis("127.0.0.1", 6379);
    
                        System.out.println("获取不释放i"+j+","+redisSemaphore.acquire(conn,30));
    
                    }
                }).start();
    
            }
    
        }

    打印

    ==================多线程获取并释放信号量结果==================
    获取释放,i8,true
    获取释放,i7,true
    获取释放,i1,true
    获取释放,i10,true
    获取释放,i3,true
    获取释放,i6,true
    获取释放,i0,true
    获取释放,i4,true
    获取释放,i2,true
    获取释放,i9,true
    获取释放,i5,true
    ==================多线程不释放获取信号量结果 下面正常获取表示有上面有正常释放==================
    获取不释放i1,true
    获取不释放i0,true
    获取不释放i2,true
    获取不释放i3,true
    获取不释放i4,true
    获取不释放i5,true
    获取不释放i8,true
    获取不释放i7,true
    获取不释放i6,false
    获取不释放i11,true
    获取不释放i10,true
    获取不释放i9,true

    公平信号量 

    说明

    集群情况下 各个服务器时间可能不一致,可能导致不同服务器先后获取信号量,后获取的服务器时间比先获取的时间大,抢占了信号量,通过维护一个原子性的index在redis 通过index的set来获取rank

    改动方法

     /**
             * 信号量
             * @param timeout
             * @return
             */
            public boolean acquire( Jedis conn,Integer timeout) {
                String index = UUID.randomUUID().toString();
                //删除过期的
                conn.zremrangeByScore("semaphore:acquire", 0, System.currentTimeMillis());
                ZParams zParams=new ZParams();
                zParams.weightsByDouble(1,0);//第一个集合的socre权重设置为最大 合并后取第一个集合的socre作为新的集合score
                //跟原子性index的集合 做交集 相当于通过index socre的集合进行过期的删除
                conn.zinterstore("semaphore:acquire2", zParams,"emaphore:acquire:index", "semaphore:acquire2");
                //计算过期时间
                Calendar c = Calendar.getInstance();
                c.setTime(new Date());
                c.add(Calendar.SECOND, timeout);
                conn.zadd("semaphore:acquire", c.getTime().getTime(), index);
                conn.zadd("semaphore:acquire2", conn.incr("semaphore:acquire:index"), index);
                //判断是否获得信号量 根据获得的排名来
                Long rank = conn.zrank("semaphore:acquire2", index);
                if (rank > limit) {
                    //删除
                    conn.zrem("semaphore:acquire2", index);
                    conn.zrem("semaphore:acquire", index);
                    return false;
                }
                //线程缓存保存用于释放
                semaphoreValue.set(index);
                return true;
    
            }

    消除竞争

    a incr 后得到5   b incr得到6   b先sadd  判断rank 成功,这个时候a 又sadd  判断rank成功导致信号量比limit多 可以在外面加一层分布式锁

  • 相关阅读:
    配置文件和脚本文件区别
    .sh
    瘋耔思维空间
    vi编辑器的三种模式
    在ubuntu系统荣品开发配套JDK安装
    如何查看自己运行ubuntu是32位还是64位
    志气
    高仿微信朋友圈
    Java OCR tesseract 图像智能字符识别技术 Java代码实现
    构建基于Javascript的移动CMS——加入滑动
  • 原文地址:https://www.cnblogs.com/LQBlog/p/13392366.html
Copyright © 2011-2022 走看看