手写分布式锁,仅适用于单例 Redis。
与多线程的加锁解锁机制一样,分解出加锁解锁需要做的动作后,想办法保证动作的原子性即可。
X86 架构提供了 getAndSet 原语,保证了锁的检查与上锁这组动作的原子性,操作系统在其基础上提供了非常多的加锁方法。
Redis 也提供了类似的 “原语”:SETNX 指令。如果 key 存在则返回 0 ,如果 key 不存在则设置 key 并返回 1 。
通过 SETNX 上锁,在持有锁的情况下 DEL 掉 KEY 解锁。加锁、解锁两个动作的原子性都可以保证。
但分布式环境下,情况没有这么简单。一方面,需要在锁上维护请求标识,防止一个请求的锁被其它请求解锁;另一方面,需要在锁上维护过期时间,防止发起请求的线程挂掉导致死锁。
这样,加锁/解锁 的动作就变的复杂了:
加锁:检查锁是否存在,不存在则上锁;存在则检查过期时间,若当前锁已过期则删除当前锁并上锁。
解锁:获取锁,检查请求 ID 是否为本次请求对应的 ID,如果是则解锁。
检查锁是否存在,不存在则上锁;可以通过 SETNX 指令保证原子性。
存在则检查过期时间,若当前锁已过期则删除当前锁并上锁;检查请求 ID 是否为本次请求对应的 ID,如果是则解锁 两个动作是没有指令支持的。需要想办法保证其原子性。
这里采用 LUA 脚本的方式保证上述两组动作的原子性。
定义接口:
public interface RedisRockUtil { //加锁 public boolean lock(String key, String requestId) throws Exception; //解锁 public boolean unlock(String key, String requestId) throws Exception; }
实现类:
public class ProtoRedisRockUtil implements RedisRockUtil { ProtoRedisRockUtil(RedisTemplate redisTemplate, int cacheTime) { this.redisTemplate = redisTemplate; this.cacheTime = cacheTime; } private RedisTemplate<String, Object> redisTemplate; //缓存存活时间 private int cacheTime; /** * @Author * @Date 2021/9/27 上午12:11 * @Description 缓存格式:过期时间的时间戳|请求唯一标识 * 通过 SETNX 模拟 getAndSet * 通过 LUA 脚本保证 "删除过期锁、上锁" 这一对操作的原子性 */ @Override public boolean lock(String key, String requestId) throws InterruptedException { int tryCount = 3; while (tryCount > 0) { long currentTime = System.currentTimeMillis(); //缓存存活的最终时间 Long overdueTime = currentTime + this.cacheTime; String val = overdueTime + "|" + requestId; //竞争到锁 if (redisTemplate.opsForValue().setIfAbsent(key, val)) { System.out.println("竞争锁成功!"); return true; } StringBuilder USER_AIMS_GOLD_LUA = new StringBuilder(); USER_AIMS_GOLD_LUA.append("local value = redis.call('get',KEYS[1]);"); USER_AIMS_GOLD_LUA.append("if not value then return '-1'; end;"); USER_AIMS_GOLD_LUA.append("local position = string.find(value,'|');"); USER_AIMS_GOLD_LUA.append("local timeStemp = string.sub(value,0,position-1)"); USER_AIMS_GOLD_LUA.append("timeStemp = tonumber(timeStemp)"); USER_AIMS_GOLD_LUA.append("local currentTime = tonumber(ARGV[1])"); USER_AIMS_GOLD_LUA.append("if currentTime>timeStemp then redis.call('del',KEYS[1]);"); USER_AIMS_GOLD_LUA.append("if redis.call('setnx', KEYS[1], ARGV[2])==1 then return '1'; " + "else return '0';end;"); USER_AIMS_GOLD_LUA.append("else return '0';end;"); DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); defaultRedisScript.setScriptText(USER_AIMS_GOLD_LUA.toString()); defaultRedisScript.setResultType(String.class); List<String> keyList = new ArrayList(); keyList.add(key); String result = (String) redisTemplate.execute(defaultRedisScript, keyList, String.valueOf(currentTime), val); //删除过期锁并竞争锁成功 if ("1".equals(result)) { System.out.println("删除过期锁并竞争锁成功!"); return true; } //未竞争到锁,检查当前锁是否已到期。防止死锁 tryCount--; Thread.sleep(200); } System.out.println("竞争锁失败!"); return false; } /** * @Author * @Date 2021/9/26 下午10:48 * @Description 释放锁 * 通过 LUA 脚本保证 "核对 uuid 、释放锁" 这一对动作的原子性 */ @Override public boolean unlock(String key, String requestId) { StringBuilder USER_AIMS_GOLD_LUA = new StringBuilder(); USER_AIMS_GOLD_LUA.append("local value = redis.call('get',KEYS[1]);"); USER_AIMS_GOLD_LUA.append("if not value then return '-1'; end;"); USER_AIMS_GOLD_LUA.append("local position = string.find(value,'|');"); USER_AIMS_GOLD_LUA.append("local requestId = string.sub(value,position+1)"); USER_AIMS_GOLD_LUA.append("if ARGV[1]==requestId then "); USER_AIMS_GOLD_LUA.append("redis.call('del',KEYS[1]);return '1';"); USER_AIMS_GOLD_LUA.append("else return '0'; end;"); DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); defaultRedisScript.setScriptText(USER_AIMS_GOLD_LUA.toString()); defaultRedisScript.setResultType(String.class); List<String> keyList = new ArrayList(); keyList.add(key); Object result = redisTemplate.execute(defaultRedisScript, keyList, requestId); if ("1".equals(result)) System.out.println("自行释放锁成功"); return "1".equals(result); } }
测试,10 个线程通过分布式锁依次打印 0-1000000:
public static void main(String[] args) throws Exception { JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(); jedisConnectionFactory.setHostName("172.16.55.2"); jedisConnectionFactory.setPort(6379); jedisConnectionFactory.afterPropertiesSet(); RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(jedisConnectionFactory); redisTemplate.setDefaultSerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); RedisRockUtil util = new ProtoRedisRockUtil(redisTemplate, 10 * 1000); String key = "testKey11", uuid = "123456"; final List<Integer> arr = new ArrayList<Integer>(); arr.add(0); for (int i = 0; i < 10; i++) { new Thread(() -> { try { while (arr.size() <= 100000) { String randomId = UUID.randomUUID().toString(); RedisRockUtil lock = new ProtoRedisRockUtil(redisTemplate, 10000); if (lock.lock(key, randomId)) { if (arr.size() > 100000) break; System.out.println("当前持锁线程:" + Thread.currentThread().getId()); int len = arr.size(); int pre = arr.get(len - 1); int current = pre + 1; arr.add(current); System.out.println(current); lock.unlock(key, randomId); } } } catch (Throwable e) { e.printStackTrace(); } }).start(); } Thread.sleep(5 * 60 * 1000); int len = arr.size(); for (int i = 0; i < arr.size() - 1; i++) { int next = arr.get(i + 1); int curr = arr.get(i); if (next - curr != 1) { System.out.println(i + "出错!:" + curr + "," + next); break; } } System.out.println("检查 " + len + " 个数据,无错误"); }
测试结果,没有出现变量污染的问题,加锁解锁保证了线程操作 ArrayList 以及打印操作的原子性: