分布式锁
非分布式下使用锁
1. 乐观锁
- 利用版本号来检测数据是否发生变化,从而判断是否能进行更新
- JAVA 使用比较交换机制-CAS(Compare And Swap)机制实现
- i++非线程安全,使用原子类AtomicInteger
- JDK1.5开始提供原子类,使用CAS机制,乐观锁,并发包java.util.concurrent
2. 悲观锁
- 执行前加锁,执行后解锁
- JAVA 使用 synchronized 与 ReentrantLock 实现
3. 公平锁
- 线程有序获取锁,以队列的方式实现
- ReentrantLock中的fairSync实现,通过构造方法传入true
4. 非公平锁
- 线程无序获取锁
- ReentrantLock中的NonfairSync实现,默认都为非公平锁
5. 超卖现象
解决方式(单体项目)
- 通过数据库解决,使用update的行锁解决并发
update product set count = count - 1 where count > 0 and id = '1'
update product set count = count - 1 where id = '1'
# 后续再针对库存进行校验,若为负数,抛出异常
- 使用synchronized, 有一种情况为事务未提交锁已释放,需要手动控制事务
// 获取事务
TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
// 提交事务
platformTransactionManager.commit(transaction);
// 回滚事务
platformTransactionManager.rollback(transaction);
- synchronized块
// 对象锁
synchronized(this){}
// 类锁
synchronized(Product.class){}
- 使用ReentrantLock
- 建议使用 try-finally 的方式,防止异常未释放锁
Lock lock = new ReentrantLock();
lock.lock();
try{
...
}finally{
lock.unlock();
}
分布式锁
1. 使用数据库的悲观锁
查询语句后加入 for update
# 查看mysql是否自动提交
SELECT @@autocommit;
# 1 自动提交 0 不自动提交
SET @@autocommit = 0;
SELECT * FROM order WHERE id = '1' FOR UPDATE
- 优点:简单方便,易于理解,易于操作
- 缺点:并发量大,对数据库压力较大
- 建议:作为锁的数据库与业务数据库分开
2. 基于Redis的SetNX实现分布式锁
2.1 实现原理
- 获取锁的命令
-- SET resource_name my_random_value NX PX 30000- resource_name:资源名称,可根据不同的业务区分不同的锁(相当于key)
- my_random_value:随机值,每个线程的随机值都不同(UUID),用于释放锁的校验
- NX:key不存在时设置成功,key存在则设置不成功,原子性操作
- PX:自动失效时间,出现异常情况,锁可以过期失效。防止释放锁异常
- 利用NX的原子性,多个线程并发时,只有一个线程可以设置成功
- 设置成功即获得锁,可以执行后续的业务处理
- 如果出现异常,过了锁的有效期,自动释放
- 释放锁采用Redis的delete命令
- 释放锁时校验之前设置的随机数,相同才能释放,防止释放了别人的锁
- 释放锁使用LUA脚本
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
2.2 实现
- java实现代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
/**
* 实现AutoCloseable接口,可用JDK1.7的try(xx)中的自动关闭特性
*/
@Slf4j
public class RedisLock implements AutoCloseable {
private RedisTemplate redisTemplate;
private String key;
private String value;
//单位:秒
private int expireTime;
public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
this.redisTemplate = redisTemplate;
this.key = key;
this.expireTime=expireTime;
this.value = UUID.randomUUID().toString();
}
/**
* 获取分布式锁
* @return
*/
public boolean getLock(){
RedisCallback<Boolean> redisCallback = connection -> {
//设置NX
RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
//设置过期时间
Expiration expiration = Expiration.seconds(expireTime);
//序列化key
byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
//序列化value
byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
//执行setnx操作
Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
return result;
};
//获取分布式锁
Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
return lock;
}
public boolean unLock() {
String script = "if redis.call("get",KEYS[1]) == ARGV[1] then
" +
" return redis.call("del",KEYS[1])
" +
"else
" +
" return 0
" +
"end";
RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
List<String> keys = Arrays.asList(key);
Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
log.info("释放锁的结果:"+result);
return result;
}
@Override
public void close() throws Exception {
unLock();
}
}
3. 使用zookeeper实现分布式锁
zk节点
- 持久节点
- 瞬时节点:不可有子节点,会话结束自动消失,zk重启也消失
3.1 原理
观察器
- 监听3个方法:getData, getChildren, exists
- 节点数据变化,通知客户端
- 只通知一次
分布式锁原理
- 利用zk的瞬时节点的有序性特点
- 多线程并发创建有序瞬时节点,得到序列号
- 序号最小获得锁
- 其他线程监听前一个序列
- 前一个线程执行完成,删除序列,后一个线程获得通知,可执行
- 节点创建时,已决定了线程的执行顺序
3.2 实现
maven
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
获取锁与释放锁
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
private String znode;
public ZkLock() throws IOException {
this.zooKeeper = new ZooKeeper("localhost:2181",
10000,this);
}
public boolean getLock(String businessCode) {
try {
//创建业务 根节点
Stat stat = zooKeeper.exists("/" + businessCode, false);
if (stat==null){
zooKeeper.create("/" + businessCode,businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
//创建瞬时有序节点 /order/order_00000001
znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
//获取业务节点下 所有的子节点
List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
//子节点排序
Collections.sort(childrenNodes);
//获取序号最小的(第一个)子节点
String firstNode = childrenNodes.get(0);
//如果创建的节点是第一个子节点,则获得锁
if (znode.endsWith(firstNode)){
return true;
}
//不是第一个子节点,则监听前一个节点
String lastNode = firstNode;
for (String node:childrenNodes){
if (znode.endsWith(node)){
zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
break;
}else {
lastNode = node;
}
}
synchronized (this){
wait();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public void close() throws Exception {
zooKeeper.delete(znode,-1);
zooKeeper.close();
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
}
4. 使用Curator客户端实现分布式锁
资料查看 官网
maven
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
实现流程:
- 获取连接
- 执行start方法
- 获取锁
- 释放锁
- 执行close
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/order");
try {
if ( lock.acquire(30, TimeUnit.SECONDS) ) {
try {
// 业务
} finally {
lock.release();
}
}
} catch (Exception e) {
e.printStackTrace();
}
client.close();
先声明bean,再进行使用
/*
* 在config中声明bean
* 加入start与close,可在bean初始化与bean销毁时调用相应的方法
*/
@Bean(initMethod="start",destroyMethod = "close")
public CuratorFramework getCuratorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
return client;
}
// 使用
@Autowired
private CuratorFramework client;
public void test(){
InterProcessMutex lock = new InterProcessMutex(client, "/order");
try{
if (lock.acquire(30, TimeUnit.SECONDS)){
// 业务
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
// 释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5. Redisson客户端实现分布式锁
资料 redisson官网
- maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
- 实现
Config config = new Config();
config.useSingleServer().setAddress("redis://ip:port");
RedissonClient redisson = Redisson.create(config);
RLock rLock = redisson.getLock("order");
try {
rLock.lock(30, TimeUnit.SECONDS);
// 业务
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放锁
rLock.unlock();
}
使用springboot的方式
- maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.2</version>
</dependency>
- 配置文件参看 github地址 资料redisson-spring-boot-starter
spring:
redis:
database: 1
host: ip
port: 6379
password: 123
- 实现
@Autowired
private RedissonClient redisson;
public void test() {
RLock rLock = redisson.getLock("order");
try {
rLock.lock(30, TimeUnit.SECONDS);
// 业务
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 解锁
rLock.unlock();
}
}
分布式锁实现方式比较
方式 | 优点 | 缺点 |
---|---|---|
数据库 | 简单 | 数据库压力大 |
Redis | 麻烦,不支持阻塞 | |
Zk | 支持阻塞 | 原理复杂 |
Curator | 依赖zk,强一致性 | |
Redisson | 支持阻塞 |