分布式锁实现方案优缺点
不推荐自己编写的分布式锁
推荐Redisson和Curator实现的分布式锁。
一、使用锁解决电商中的超卖问题?
举例:某件商品库存数量10件,结果卖出了20件
二、超卖现象的产生
A和B同时看到这个商品,加入购物车,并同时提交订单,导致了超卖的现象。
三、超卖现象模拟
1)问题模拟。假设当前库存为1
创建订单: 在订单中先获取是否有库存,然后更新库存数量为(原库存数-1),然后插入订单
单元测试:创建线程池,并发执行创建订单。
cyclicBarrier.await的作用: 让线程等待,在某一个时刻,让5个线程同时执行。
测试结果: 产生了5条订单,并且库存为0. 这就是超卖现象的产生。
四、超卖现象的解决
1、方法1: 数据库update行锁 (单体应用)
扣减库存不在程序中进行,而是通过数据库
向数据库传递库存增量,扣减一个库存,增量为-1
在数据库update语句计算库存,通过update行锁解决并发。更新后检查库存是否是负数,如果是负数抛出异常,则整个操作回滚。
2、解决方法二(synchronized和ReentrantLock) (单体应用)
校验库存、扣减库存统一加锁,使之成为原则的操作。并发时,只有获得锁的线程才能校验、扣减库存。
注意: 事务的操作通过代码控制。如果事务通过注解的方式写在方法里,并且这个方法synchronized 修饰。 那么会在同步方法结束后,才提交事务。这样当事务还没提交,另外一个线程进入了同步方法。
代码如下:
public Integer createOrder() throws Exception{ Product product = null; lock.lock(); try { TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); product = productMapper.selectByPrimaryKey(purchaseProductId); if (product==null){ platformTransactionManager.rollback(transaction1); throw new Exception("购买商品:"+purchaseProductId+"不存在"); } //商品当前库存 Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount); //校验库存 if (purchaseProductNum > currentCount){ platformTransactionManager.rollback(transaction1); throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买"); } productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId()); platformTransactionManager.commit(transaction1); }finally { lock.unlock(); } TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order(); order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum))); ... orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem(); ... orderItemMapper.insertSelective(orderItem); platformTransactionManager.commit(transaction); return order.getId(); }
3、基于数据库实现分布式锁
多个进程、多个线程访问共同的组件数据库,
通过select ...... for update 访问同一条数据
for update锁住这条数据,其他线程只能等待
接口代码如下,此程序是分布式部署。比如开启了两个程序,localhost:8080/myproject/singleLock localhost:8081/myproject/singleLock
selectDistributeLock的SQL语句里select ...... for update
测试:分别调用localhost:8080/myproject/singleLock localhost:8081/myproject/singleLock这里这两个接口。当第一个接口执行完后,等待2秒。 第二个请求也获得到了锁。
基于数据库实现分布式锁的优缺点
优点: 简单方便,易于理解、易于操作
缺点: 并发量大时,对数据库压力较大。
建议:作为锁的数据库与业务数据库分开。
4、基于Redis的Setnx实现分布式锁
1) 实现原理:
获取锁的Redis命令 SET resource_name my_random_value NX PX 30000
resource_name: 资源名称,可根据不同的业务区分不同的锁
my_random_value: 随机值,每个线程的随机值都不同,用于释放锁时的校验
NX: key不存在时设置成功,可以存则设置不成功。
PX:自动失效时间,出现异常情况,锁可以过期失效。
利用了NX的原子性,多个线程并发时,只有一个线程可以设置成功。
设置成功即可获得锁,可以执行后续的业务处理。
如果出现异常,过了锁的有效期,锁自动释放。
2) 释放锁原理
释放锁采用Redis的delete命令
释放锁时校验之前设置的随机数,相同才能释放。(为了证明这个值是你这个线程设置的)
释放锁采用LUA脚本
3) Redis分布式锁实战
Reids安装可参考 https://www.cnblogs.com/linlf03/p/9465272.html
创建SpringBoog工程: distribute-lock-demo
增加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency>
配置Redis连接
创建Controller类
@RestController @Slf4j public class RedisLockControler { @Autowired private RedisTemplate redisTemplate; @RequestMapping("redisLock") public String redisLock(){ log.info("进入redisLock 方法"); String key = "redisKey"; String value = UUID.randomUUID().toString() ; RedisCallback redisCallback = redisConnection -> { RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); //过期时间30秒 Expiration expiration = Expiration.seconds(30); //序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); // 序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(value); //执行setnx操作 Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption); return result; }; //获取分布式锁 Boolean lock = (Boolean)redisTemplate.execute(redisCallback); if(lock){ log.info("进入了锁!!"); //模拟业务处理,耗时15秒 try { Thread.sleep(15000); }catch (InterruptedException e){ e.printStackTrace(); }finally { String script = "if redis.call("get",KEYS[1]) == ARGV[1] then " + " return redis.call("del",KEYS[1]) " + "else " + " return 0 " + "end"; RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class); List<String> keys = Arrays.asList(key); Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("释放锁的结果:" + result); } }else { log.info("没有获取到锁"); } log.info("方法执行完成"); return "方法执行完成"; } }
启动DistributeLockDemoApplication
一个端口为8080,一个端口为8081
8081的端口设置如下
然后分别调用http://localhost:8080/redisLock 和http://localhost:8081/redisLock
说明8080获得到了锁,并且执行完成后释放了锁。
8081执行未获得锁。
5 基于分布式锁解决定时任务(服务集群部署)重复问题
在前面的基础上对分布式锁进行封装
@Slf4j public class RedisLock implements AutoCloseable{ private RedisTemplate redisTemplate; private String key; private String value; //单位:秒 private int expireTime; public RedisLock(RedisTemplate redisTemplate, String key, int expireTime){ this.redisTemplate = redisTemplate; this.key = key; this.expireTime = expireTime; this.value = UUID.randomUUID().toString(); } /** * 获取分布式锁 * @return */ public boolean getLock(){ RedisCallback redisCallback = redisConnection -> { RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); //过期时间30秒 Expiration expiration = Expiration.seconds(30); //序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); // 序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(value); //执行setnx操作 Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption); return result; }; //获取分布式锁 Boolean lock = (Boolean)redisTemplate.execute(redisCallback); return lock; } public boolean unLock(){ String script = "if redis.call("get",KEYS[1]) == ARGV[1] then " + " return redis.call("del",KEYS[1]) " + "else " + " return 0 " + "end"; RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class); List<String> keys = Arrays.asList(key); Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("释放锁的结果={}", result); return result; } @Override public void close() throws Exception { unLock(); } }
增加定时任务
增加注解EnableScheduling
定时任务为每隔5秒发送短信
@Service @Slf4j public class SchedulerService { @Autowired private RedisTemplate redisTemplate; //每隔5秒钟执行一次 @Scheduled(cron = "0/5 * * * * ?") public void SendSMS(){ try( RedisLock redisLock = new RedisLock(redisTemplate, "autoSMS",30)) { if(redisLock.getLock()){ log.info("向18555558888发送短信"); }else { log.info("没有获取到锁"); } }catch (Exception e){ e.printStackTrace(); } } }
启动DistributeLockDemoApplication
输出结果如下图所示,同一个时间点只有一个服务获取到了锁。
6、基于Zookeeper的分布式锁
1)Zookeeper的安装,可以参考Linux下Zookeeper的下载、安装和启动
2) 原理
重要概念: Zookkeeper的观察器
可以设置观察器的3个方法: getData(), getChildren(), exists();
节点发生变化,发送给客户端
观察器只能监控一次,再监控需重新设置。
实现原理:
利用Zookeeper的瞬时有序节点的特性
多线程并发创建瞬时节点时,得到有序的序列
序号最小的线程获得锁
其他的线程则监听自己序号前一个序号
前一个线程执行完成,删除自己序号的节点
下一个序号的线程得到通知,继续执行
以此类推
创建节点时,已经确定了线程的执行顺序。
3) Zookeeper的分布式锁实践
增加依赖,版本和服务端Zookeeper 3.4.13保持一致。
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency>
创建Zookeeperr锁
@Slf4j public class ZkLock implements AutoCloseable, Watcher { private ZooKeeper zooKeeper; private String znode; public ZkLock() throws IOException { this.zooKeeper = new ZooKeeper("47.xx.xx.120:2181", 10000, this); } /** * 获取分布式锁 * @return */ public boolean getLock(String businessCode){ try { //创建业务根节点 Stat stat = zooKeeper.exists("/" + businessCode, false); if(stat == null){ zooKeeper.create("/" +businessCode , businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //创建瞬时有序的节点. znode的值为/order/order_00000001 znode = zooKeeper.create("/" +businessCode + "/" + businessCode + "_" , businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //获得业务节点下所有的子节点 List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false); Collections.sort(childrenNodes); // 获取序号最小的(第一个)子节点 String firstNode = childrenNodes.get(0); // 如果创建的节点是第一个节点,则获得锁 if(znode.endsWith(firstNode)){ return true; } // 不是第一个子节点,则监听前一个节点 String lastNode = firstNode; for(String node: childrenNodes){ if(znode.endsWith(node)){ zooKeeper.exists("/" +businessCode + "/" +lastNode, true); break; }else { lastNode = node; } } //等待监听的节点发生变化 synchronized (this){ wait(); } return true; }catch (Exception e){ e.printStackTrace(); } return false; } @Override public void close() throws Exception { zooKeeper.delete(znode, -1); zooKeeper.close(); log.info("已经释放了锁"); } @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getType() == Event.EventType.NodeDeleted){ synchronized (this){ //唤起线程 notify(); } } } }
增加调用接口
@RestController @Slf4j public class ZookeeperControler { @RequestMapping("zookeeperLock") public String zookeeperLock(){ log.info("进入zookeeperLock 方法"); try(ZkLock zkLock = new ZkLock()) { if(zkLock.getLock("order")){ log.info("获得了锁"); //模拟业务处理,耗时15秒 Thread.sleep(15000); }else { log.info("没有获取到锁"); } }catch (Exception e){ e.printStackTrace(); } log.info("方法执行完成"); return "方法执行完成"; } }
测试接口,分别启动两个服务,端口分别为8080和8081
分别调用http://localhost:8080/zookeeperLock 和 http://localhost:8081/zookeeperLock
7、使用Curator实现分布式锁
Apache Curator是为ZooKeeper开发的一套Java客户端类库,它是一个分布式协调服务。
添加依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
在DistributeLockDemoApplication类中添加Bean
@Bean(initMethod = "start", destroyMethod = "close") public CuratorFramework getCuratorFramework(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("47.xx.xx.120:2181", retryPolicy); return client; }
ZookeeperControler类中添加方法
@RequestMapping("curatorLock") public String curatorLock(){ log.info("进入curatorLock 方法"); InterProcessMutex lock = new InterProcessMutex(client,"/order"); try { if(lock.acquire(30, TimeUnit.SECONDS)){ log.info("获得了锁!"); Thread.sleep(10000); } }catch (Exception e){ e.printStackTrace(); }finally { try { log.info("释放了锁"); lock.release(); }catch (Exception e){ e.printStackTrace(); } } log.info("方法执行完成"); return "方法执行完成"; }
测试接口,分别启动两个服务,端口分别为8080和8081
分别调用http://localhost:8080/curatorLock 和 http://localhost:8081/curatorLock
8、 redisson分布式锁(API方式)
1) 引入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.2</version> </dependenc
2)Redisson API方式
Redisson API方式,缺点:使用繁琐,每次需要配置
@RestController @Slf4j public class RedissonLockController { /** * Redisson API方式,缺点:使用繁琐 * @return */ @RequestMapping("redissonLock") public String redissonLock() { Config config = new Config(); config.useSingleServer().setAddress("redis://47.98.47.120:6379"); RedissonClient redisson = Redisson.create(config); RLock rLock = redisson.getLock("order"); log.info("进入了redissonLock方法"); try { rLock.lock(30, TimeUnit.SECONDS); log.info("获得了锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); }finally { rLock.unlock(); log.info("释放了锁"); } log.info("方法执行完成"); return "方法执行完成"; } }
3) 测试
测试接口,分别启动两个服务,端口分别为8080和8081
分别调用http://localhost:8080/redissonLock 和 http://localhost:8081/redissonLock
9、 spring boot 集成redisson分布式锁
1) 增加依赖
注释redisson依赖,引入redisson-spring-boot-starter依赖
配置文件增加属性
spring.redis.host=47.xx.xx.120
2) 增加方法redissonLock2
@RequestMapping("redissonLock2") public String redissonLock2() { RLock rLock = redisson.getLock("order"); log.info("进入了redissonLock2方法"); try { rLock.lock(30, TimeUnit.SECONDS); log.info("获得了锁"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); }finally { rLock.unlock(); log.info("释放了锁"); } log.info("方法执行完成"); return "方法执行完成"; }