非公平信号量
说明
1.通过zset add 和rank来实现是否获取信号量的判断,
2.add时通过当前时间+超时时间 计算的时间设置为score 每次add提前删除过期的0~当前时间
信号量类封装
public static class RedisSemaphore { //线程缓存保存index 用于释放 ThreadLocal<String> semaphoreValue = new ThreadLocal<>(); private Integer limit; public RedisSemaphore( Integer limit) { //因为redis rank从0开始 所以 limit-1 this.limit = limit-1; } /** * 信号量 * @param timeout * @return */ public boolean acquire( Jedis conn,Integer timeout) { String index = UUID.randomUUID().toString(); //计算过期时间 Calendar c = Calendar.getInstance(); c.setTime(new Date()); c.add(Calendar.SECOND, timeout); conn.zadd("semaphore:acquire", c.getTime().getTime(), index); //删除过期的 conn.zremrangeByScore("semaphore:acquire", 0, System.currentTimeMillis()); //判断是否获得信号量 根据获得的排名来 Long rank = conn.zrank("semaphore:acquire", index); if (rank > limit) { //删除 conn.zrem("semaphore:acquire", index); return false; } //线程缓存保存用于释放 semaphoreValue.set(index); return true; } public void release( Jedis conn) { String index = semaphoreValue.get(); semaphoreValue.remove();; if (index == null) { return; } conn.zrem("semaphore:acquire", index); } }
测试类
public static void main(String[] args) throws Exception { Jedis conn = new Jedis("127.0.0.1", 6379); conn.flushDB(); RedisSemaphore redisSemaphore=new RedisSemaphore(11); CountDownLatch countDownLatch=new CountDownLatch(11); //=====================获取并释放=================== System.out.println("==================多线程获取并释放信号量结果=================="); for(int i=0;i<11;i++){ final int x=i; new Thread(new Runnable() { int j=x; @Override public void run() { Jedis conn = new Jedis("127.0.0.1", 6379); System.out.println("获取释放,i"+j+","+redisSemaphore.acquire(conn,30)); redisSemaphore.release(conn); countDownLatch.countDown(); } }).start(); } countDownLatch.await(); //=====================不释放=================== System.out.println("==================多线程不释放获取信号量结果 下面正常获取表示有上面有正常释放=================="); for(int i=0;i<12;i++){ final int x=i; new Thread(new Runnable() { int j=x; @Override public void run() { Jedis conn = new Jedis("127.0.0.1", 6379); System.out.println("获取不释放i"+j+","+redisSemaphore.acquire(conn,30)); } }).start(); } }
打印
==================多线程获取并释放信号量结果================== 获取释放,i8,true 获取释放,i7,true 获取释放,i1,true 获取释放,i10,true 获取释放,i3,true 获取释放,i6,true 获取释放,i0,true 获取释放,i4,true 获取释放,i2,true 获取释放,i9,true 获取释放,i5,true ==================多线程不释放获取信号量结果 下面正常获取表示有上面有正常释放================== 获取不释放i1,true 获取不释放i0,true 获取不释放i2,true 获取不释放i3,true 获取不释放i4,true 获取不释放i5,true 获取不释放i8,true 获取不释放i7,true 获取不释放i6,false 获取不释放i11,true 获取不释放i10,true 获取不释放i9,true
公平信号量
说明
集群情况下 各个服务器时间可能不一致,可能导致不同服务器先后获取信号量,后获取的服务器时间比先获取的时间大,抢占了信号量,通过维护一个原子性的index在redis 通过index的set来获取rank
改动方法
/** * 信号量 * @param timeout * @return */ public boolean acquire( Jedis conn,Integer timeout) { String index = UUID.randomUUID().toString(); //删除过期的 conn.zremrangeByScore("semaphore:acquire", 0, System.currentTimeMillis()); ZParams zParams=new ZParams(); zParams.weightsByDouble(1,0);//第一个集合的socre权重设置为最大 合并后取第一个集合的socre作为新的集合score //跟原子性index的集合 做交集 相当于通过index socre的集合进行过期的删除 conn.zinterstore("semaphore:acquire2", zParams,"emaphore:acquire:index", "semaphore:acquire2"); //计算过期时间 Calendar c = Calendar.getInstance(); c.setTime(new Date()); c.add(Calendar.SECOND, timeout); conn.zadd("semaphore:acquire", c.getTime().getTime(), index); conn.zadd("semaphore:acquire2", conn.incr("semaphore:acquire:index"), index); //判断是否获得信号量 根据获得的排名来 Long rank = conn.zrank("semaphore:acquire2", index); if (rank > limit) { //删除 conn.zrem("semaphore:acquire2", index); conn.zrem("semaphore:acquire", index); return false; } //线程缓存保存用于释放 semaphoreValue.set(index); return true; }
消除竞争
a incr 后得到5 b incr得到6 b先sadd 判断rank 成功,这个时候a 又sadd 判断rank成功导致信号量比limit多 可以在外面加一层分布式锁