分布式锁
为了保证一个方法或属性在高并发情况下的同一时间只能被一个线程访问,在单机部署的情况下,可以使用ReentrantLock或Synchronized进行互斥控制。随着发展,单机部署的系统已经不能满足业务的需要,越来越多的系统进化成分布式集群系统,原本在单机运行的锁控制已经不能实现“一个方法或属性在高并发情况下的同一时间只能被一个线程访问”的控制,要在分布式系统中实现在单机中的控制效果就必须使用分布式锁。
Redis实现方式
加入依赖
<!--redisson分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
配置Redisson bean
@Bean
public Redisson redisson(){
//单机模式
Config config = new Config();
// config.useClusterServers().addNodeAddress("","","");//集群模式
config.useSingleServer().setAddress("redis://192.168.86.126:6379");
return (Redisson) Redisson.create(config);
}
实现
//业务代码加redis分布式锁
String lockKey = "redis-lock-key";
RLock rlock = redisson.getLock(lockKey);
//加锁并设置超时时间,自动续幂
rlock.lock(30, TimeUnit.SECONDS);
try {
//加锁业务代码
……
} finally {
//解锁
rlock.unlock();
}
Zookeeper实现
加入依赖
<!--zookeeper的jar依赖包-->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
Lock接口
public interface Lock {
void lock() throws Exception;
void unlock() throws Exception;
}
写一个抽象类AbstractZookeeperLock实现加锁和解锁
public abstract class AbstractZookeeperLock implements Lock {
protected String lock;
protected String zkAddress = "localhost:2181";
protected ZkClient zkClient = new ZkClient(zkAddress);
@Override
public final void lock() throws Exception {
//尝试获取锁
if (tryLock()) {
//拿到锁
System.out.println("获取锁成功...");
} else {
//尝试获取锁未成功,等待获取锁,阻塞
// 如果此处已经不阻塞了,那么可以继续执行下面的代码
waitLock();
//阻塞结束,继续获取锁
lock();
}
}
@Override
public final void unlock() throws Exception {
//临时节点:临时存储,当客户端的链接与zookeeper断开后,临时节点自动删除
//关闭链接就解锁了
if (zkClient != null) {
// zkClient.delete("/path/xxx");
zkClient.close();
System.out.println("解锁成功...");
}
//持久化节点:永久在zookeeper上
}
protected abstract boolean tryLock();
protected abstract void waitLock();
}
具体使用类实现尝试获取锁和等待获取锁
public class ZookeeperDistributedLock extends AbstractZookeeperLock {
private CountDownLatch countDownLatch;
public ZookeeperDistributedLock(String lockName) {
lock = lockName;
}
//尝试获取锁
@Override
protected boolean tryLock() {
try {
//创建一个临时节点
zkClient.createEphemeral(lock);
//获取锁成功
return true;
} catch (Exception e) {
//获取锁失败
return false;
}
}
//等待获取锁
@Override
protected void waitLock() {
//如果已经有线程创建了临时节点,那么其他线程只能等待,不能再创建该临时节点
//那么就监听你这个临时节点,如果该节点被删除了,那我等待结束,就又可以创建临时节点
//订阅数据改变,就是监听参数中指定的那个节点
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (countDownLatch != null) {
//计数减1
countDownLatch.countDown();
}
}
};
//1、监听
zkClient.subscribeDataChanges(lock, listener);
// 2、判断那个锁的节点是否存在
if (zkClient.exists(lock)) {
countDownLatch = new CountDownLatch(1);
try {
//等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 3、订阅要取消一下
zkClient.unsubscribeDataChanges(lock, listener);
}
}
具体使用
//业务代码加zookeeper分布式锁
ZookeeperDistributedLock lock = new ZookeeperDistributedLock("/lock_nodeName");
try {
lock.lock();
//加锁业务代码
……
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}