分布式锁产生的场景
分布式锁在需要分布式同步的场景下使用,也就是在分布式系统下才能发挥作用,传统的单台系统使用java提供的锁,分布式场景多实例项目需要多个项目之间同步。
都有哪些实现方式
1 数据库锁:通过在数据库中创建一条记录,根据创建结果来判断是否上锁成功。实现简单但是数据库效率低。
2 redis实现:通过redis缓存中间件实现,比较繁琐,效率高。
2 zookeeper实现:通过临时节点实现,实现简单,失效时间可以控制。
效率:redis > zookeeper > 数据库
redis实现
判断 key 是否存在,如果不存在,设置key,加锁成功,并设置过期是时间,每过一段时间判断是否执行完,没执行完重设过期时间。
如果key存在,就循环判断,直到获取锁。
Redisson
Redisson是redis官方出的分布式锁工具。
基本使用方式如下:
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient client = Redisson.create(config);
RLock lock = client.getLock("abcde");
lock.lock();
try {
...
} finally {
lock.unlock();
}
创建客户端连接
根据给出的配置 创建连接
RedissonClient client = Redisson.create(config);
创建时包括,连接管理器、命令执行器和调度器
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandSyncService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
}
获取锁
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
Long ttl = tryAcquire(leaseTime, unit);
// 获取了锁 返回
if (ttl == null) {
return;
}
long threadId = Thread.currentThread().getId();
// 订阅锁消息
Future<RedissonLockEntry> future = subscribe(threadId);
get(future);//
try {
while (true) {
ttl = tryAcquire(leaseTime, unit);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
tryAcquire 获取锁方法
<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command){
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上边一段lua脚本可以看到redis获取锁的逻辑:
假设key:abcde,Lock的id为123456789 线程为thread1 ,有效期为 10
1 如果不存在 key:abcde
2 设置 abcde 然后里面一个键值对 123456789:thread1 值为 1
3 设置 abcde 有效期 10
4 如果存在abcde并且123456789:thread1
5 将123456789:thread1 的值加 1
6 重设过期时间
这里数据结构用的是redis的hash表
抢到锁后有一个线程会一直循环续命默认是10秒执行一次,一个锁默认是30秒
如果没有抢到锁:
subscribe 订阅锁的释放消息
然后循环获取锁
释放锁
public void unlock() {
Boolean opStatus= commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
}
假设 参数1是key:abcde,参数2:redisson_lock__channel__{abcde},参数3:123456789:thread1
1 判断是否有abcde这个key
2 不存在广播解锁消息
3 存在key,判断key是否有123456789:thread1参数
4 存在参数将数值减一
5 判断减一后数字是否大于0
6 大于0 重设过期时间
7 不大于 删除key,并广播解锁消息,其他线程就可以开始抢占锁了