zoukankan      html  css  js  c++  java
  • [分布式]分布式锁

    资料

    Java架构师体系课:跟随千万级项目从0到100全过程高效成长第14周.分布式锁

    不可不说的Java“锁”事 - 美团技术团队

    测试代码: https://gitee.com/qiaoxingxing/spring-cloud-learn/tree/master/distribute-lock

    锁的概念

    定义

    锁(lock)或互斥(mutex)是一种同步机制, 用于多个线程访问同一资源时对资源的的访问进行限制。锁是为了解决多线程情况下的资源竞争的问题。

    乐观锁(CAS) vs 悲观锁

    乐观锁在读取数据的时候不做任何限制,而是在更新数据的时候,进行数据的比较,保证数据的版本一致时再更新数据。
    适用于读操作多, 而写操作少的场景。
    通常增加一个版本号或者最后修改时间, 用于对比;
    比较交换的机制, 简称CAS(Compare And Swap)
    java里的原子类(Atomic)就是这个原理, 比如: AtomicInteger

    悲观锁从读取数据的时候就显示的加锁, 直到数据更新完成, 释放锁为止。在这期间只能有一个线程去操作,其他的线程只能等待。 在JAVA中,悲观锁可以使用synchronized关键字或者ReentrantLock类来实现。

    公平锁 vs 非公平锁

    公平锁: 等待线程按照队列里的顺序依次使用资源;
    非公平锁: 等待线程随机选一个; 大多数用的是这个;

    java中的ReentrantLock实现了这两种锁, 原理参考源码tryAcquire、nonfairTryAcquire

    单体应用锁 vs 分布式锁

    单体应用锁: 只用于同一个JVM里;
    分布式锁: 不同jvm通过第三方组件加锁;

    目前比较流行的分布式锁的解决方案有:

    • Redis
    • Zookeeper
    • 数据库, 通过数据库可以实现分布式锁, 但是在高并发的情况下对数据库压力较大, 所以很少使用。

    java里的synchronized

    两种语法, synchronized方法和synchronized同步代码块:

    public synchronized void someMethod(){
      ....
    }
    
    synchronized (object){ //object通常使用this
      .....
    }
    

    synchronized(this)synchronized(SomeClass.class)的区别: 如果this是单实例没区别, 如果是多实例, 相当于锁了多个对象; SomeClass.class是全局唯一的;

    场景: 超卖现象

    产生原因: 多线程同时访问时, 同时查询库存, 同时减库存, 导致超卖;
    伪代码示例:

    @Transactional
    public Integer createOrder() {
       //查询库存
        count = orderService.queryProduct();
        //检查库存
        if(count < countForSale) {
            throw new exception("库存不足");
        }
        //减库存
        orderService.decreaseProduct(countForSale);
        //创建订单
        orderService.createOrder();
    }
    

    剩余1个库存, 5个进程同时执行下单, 执行后结果:
    现象1: 产生了5个订单, 数据库的库存数量为0;
    原因: 5个线程都查到了1个库存, 都设置库存为0, 所以生成了五个订单, 库存为0;
    解决方法: update增量set count = count -1, 而不是set count = 0

    现象2: 修改了update之后, 还是产生了5个订单, 数据库的库存数量为-4;
    原因: 5个线程都查到了1个库存, 都执行库存-1, 所以生成了五个订单, 库存减了5次变成-4;
    解决方法:
    方法1: 更新库存之后再查一次, 如果库存是负数, 回滚;
    方法2: 加锁, 查询库存、扣减库存、下单打包为一个操作, 同一时间只能一个线程执行;
    下面详细介绍解决方法;

    利用数据库机制解决超卖

    update语句带行锁, 同一时间只能有一个事务(线程)能修改库存, 修改数据后再检查库存, 如果超卖就抛出异常回滚;

    @Transactional
    public Integer createOrder() {
       //查询库存
        count = orderService.queryProduct();
        //检查库存
        if(count < countForSale) {
            throw new exception("库存不足");
        }
        //减库存
        orderService.decreaseProduct(countForSale);
        //再检查库存, 如果库存是负数就回滚; 
        count = orderService.queryProduct();
        if(count < 0){
            throw new exception("库存不足, 下单失败");
        }
        //创建订单
        orderService.createOrder();
    }
    

    单体锁解决超卖

    方法1: 注解控制事务

    createOrder加上synchronized:

    @Transactional
    public synchronized Integer createOrder(Int countForSale) {
    ...
    }
    

    实测出现问题: 5个线程同时请求, 会有两个线程创建订单, 库存数量为-1;

    @Transactional注解理解

    @Transactional的原理是生成一个warp方法, 实际执行的过程等价于执行createOrderWrap:

    public Integer createOrderWrap() {
        try {
            transactionManager.start()  //1
            createOrder() //2
            transactionManager.commit() //3
        } catch (Exception ex){
            transactionManager.rollback()
        }
    }
    
    public synchronized Integer createOrder() {
        //2.1 查询库存
    ...
    }
    

    5个线程同时执行到1, 由于createOrdersynchronized, 只有一个线程A进入createOrder, 扣减库存后执行到3(开始提交事务), 同时线程B进入createOrder, 执行到2.1, 如果线程B的2.1在线程A的3之前完成, 此时查出来的还是未扣减的库存, B误以为还有库存导致重复下单;
    解决思路: 线程A提交事务之后才应该允许线程B进入createOrder方法查询库存, 也就是把事务的开始、提交也放到加锁的代码块里;

    解决方法

    改造:

    @Resource 
    OrderService self;
    
    public synchronized Integer createOrder() {
        return self.createOrderInner()
    }
    
    @Transactional
    public Integer createOrderInner() {
    ...
    }
    

    业务逻辑放到createOrderInner里, createOrderInner加上Transactional注解;
    createOrdersynchronized, 调用createOrderInner;

    此时相当于:

    public synchronized Integer createOrder() {
        return self.createOrderInnerWrap()
    }
    
    public Integer createOrderInnerWrap() {
        try {
            transactionManager.start()  //1
            createOrderInner() //2 
            transactionManager.commit() //3
        } catch (Exception ex){
            transactionManager.rollback()
        }
    }
    
    public Integer createOrderInner() {
    ...
    }
    

    此时1、2、3操作都完成后, 才会允许其他线程进入;

    方法2: 手动控制事务

    不使用事务注解, 用TransactionManager手动控制事务、加锁块;

    PlatformTransactionManager
    TransactionDefinition
    

    基于数据库实现分布式锁(悲观锁)

    创建一个lock表:

    lock_table: 
    - id
    - business_code: 业务编码, 对应不同业务代码的锁; 
    - business_name
    

    获取锁:

    select 1 from lock_table where code = 'xxx' for update
    

    释放锁: 提交事务(如果用@Transactional注解是自动执行的):

    commit;
    

    缺点
    数据库压力大; 业务和锁使用不同数据库;

    redis实现分布式锁

    手动实现:

    原理: Distributed locks with Redis – Redis

    SET resource_name my_random_value NX PX 30000
    The command will set the key only if it does not already exist (NX option), with an expire of 30000 milliseconds (PX option). The key is set to a value “my_random_value”. This value must be unique across all clients and all lock requests.

    只适用于单实例redis;
    能获取锁就执行代码, 获取不到就return;

    Redisson redis的客户端实现

    资料

    官网:
    redisson/redisson: Redisson - Redis Java client with features of In-Memory Data Grid. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Publish / Subscribe, Bloom filter, Spring Cache, Tomcat, Scheduler, JCache API, Hibernate, MyBatis, RPC, local cache ...

    文档:
    Table of Content · redisson/redisson Wiki
    和spring-boot的集成在: 14. Integration with frameworks

    上手

    redisson/redisson: quick start
    实例:

    private RedissonClient redisson;
    
    public String redissonLock() {
        RLock rLock = redisson.getLock("order");
        log.info("我进入了方法!!");
        try {
            rLock.lock(30, TimeUnit.SECONDS);
            log.info("我获得了锁!!!");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            log.info("我释放了锁!!");
            rLock.unlock();
        }
        log.info("方法执行完成!!");
        return "方法执行完成!!";
    }
    

    zookeeper实现分布式锁

    介绍

    ZooKeeper: Because Coordinating Distributed Systems is a Zoo
    ZooKeeper是分布式协同工具/分布式配置工具
    借助zookeeper可以更容易开发分布式系统; 众所周知分布式系统很容易出错, 比如资源竞争、死锁等问题;

    手动实现分布式锁

    ZooKeeper Recipes and Solutions

    基本概念

    数据结构: 树

    • 持久节点
    • 瞬时节点: 重启后消失、有序, 不能有子节点

    观察器:

    • 检测节点的变化, 通知连接的客户端;
    • 观察的方法: getData(), getChildren(), exists()
    • 只能观察一次; 借助第三方组件可以多次观察;

    分布式锁的原理: 利用瞬时节点的有序性
    过程:
    所有线程创建有序瞬时节点, 获得节点的编号, 如果是第一个节点就获得锁, 代码执行完成释放锁时删除节点;
    其他节点观察前一个节点, 前一个节点被删除时会收到通知, 获得锁;
    剩余节点以此类推;
    所以: 创建节点时执行顺序就已经确定了;

    问题: 没有超时时间, 如何避免死锁? 比如某个节点异常退出, 没有释放锁(删除节点); 或者网络异常无法删除节点;

    代码示例

    伪代码:

    //思路1: 回调的方式
    ZookeeperService zkService;
    
    doLock(callback){
        index = zkService.createNode("lock_root")
        if (index == 1) {
            callback.call()
            zkService.deleteNode()
        } else {
            zkService.watch('on_delete',index-1,()=>{
                callback.call()
                zkService.deleteNode()
            })
        }
    }
    
    //业务代码
    callback = ()=>{
        doSomeThing()
    }
    doLock(callback);
    
    //思路2: 线程wait的方法
    getLock(){
        index = zkService.createNode("lock_root")
        if(index ==1){
            return true
        }
        zkService.watch('on_delete',index-1,()=>{
            notify() //通知线程等待结束;
        })
        synchronized(this){
            wait() //线程等待
        }
    }
    
    //业务代码
    getLock()
    doSomeThing()
    

    Curator客户端实现

    Apache Curator
    Distributed Lock

    InterProcessMutex lock = new InterProcessMutex(client, lockPath);
    if ( lock.acquire(maxWait, waitUnit) ) 
    {
        try 
        {
            // do some work inside of the critical section here
        }
        finally
        {
            lock.release();
        }
    }
    
  • 相关阅读:
    Ansible 的初步使用
    HBase 和 Hive 的差别是什么,各自适用在什么场景中?Spark SQL能做什么?
    spark安装配置
    scala安装配置
    Apache Spark 3.0.0重磅发布 —— 重要特性全面解析
    hbase的安装与配置(三台集群分布式)
    Flv的结构分析
    几种直播流媒体协议
    rtmp推送aac没有声音的问题记录
    c++中SetEvent和ResetEvent的使用
  • 原文地址:https://www.cnblogs.com/QIAOXINGXING001/p/15464377.html
Copyright © 2011-2022 走看看