zoukankan      html  css  js  c++  java
  • 分布式锁

    分布式锁

    非分布式下使用锁


    1. 乐观锁

    • 利用版本号来检测数据是否发生变化,从而判断是否能进行更新
    • JAVA 使用比较交换机制-CAS(Compare And Swap)机制实现
    • i++非线程安全,使用原子类AtomicInteger
    • JDK1.5开始提供原子类,使用CAS机制,乐观锁,并发包java.util.concurrent

    2. 悲观锁

    • 执行前加锁,执行后解锁
    • JAVA 使用 synchronized 与 ReentrantLock 实现

    3. 公平锁

    • 线程有序获取锁,以队列的方式实现
    • ReentrantLock中的fairSync实现,通过构造方法传入true

    4. 非公平锁

    • 线程无序获取锁
    • ReentrantLock中的NonfairSync实现,默认都为非公平锁

    5. 超卖现象

    解决方式(单体项目)

    • 通过数据库解决,使用update的行锁解决并发
    update product set count = count - 1 where count > 0 and id = '1'
    
    update product set count = count - 1 where id = '1'
    # 后续再针对库存进行校验,若为负数,抛出异常
    
    • 使用synchronized, 有一种情况为事务未提交锁已释放,需要手动控制事务
    // 获取事务
    TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
    // 提交事务
    platformTransactionManager.commit(transaction);
    // 回滚事务
    platformTransactionManager.rollback(transaction);
    
    • synchronized块
        // 对象锁
        synchronized(this){}
    
        // 类锁
        synchronized(Product.class){}
    
    • 使用ReentrantLock
      • 建议使用 try-finally 的方式,防止异常未释放锁
    Lock lock = new ReentrantLock();
    lock.lock();
    try{
        ...
    }finally{
        lock.unlock();
    }
    

    分布式锁


    1. 使用数据库的悲观锁

    查询语句后加入 for update

    # 查看mysql是否自动提交
    SELECT @@autocommit; 
    # 1 自动提交 0 不自动提交
    SET @@autocommit = 0;
    
    SELECT * FROM order WHERE id = '1' FOR UPDATE
    
    • 优点:简单方便,易于理解,易于操作
    • 缺点:并发量大,对数据库压力较大
    • 建议:作为锁的数据库与业务数据库分开

    2. 基于Redis的SetNX实现分布式锁

    2.1 实现原理

    • 获取锁的命令
      -- SET resource_name my_random_value NX PX 30000
      • resource_name:资源名称,可根据不同的业务区分不同的锁(相当于key)
      • my_random_value:随机值,每个线程的随机值都不同(UUID),用于释放锁的校验
      • NX:key不存在时设置成功,key存在则设置不成功,原子性操作
      • PX:自动失效时间,出现异常情况,锁可以过期失效。防止释放锁异常
    • 利用NX的原子性,多个线程并发时,只有一个线程可以设置成功
    • 设置成功即获得锁,可以执行后续的业务处理
    • 如果出现异常,过了锁的有效期,自动释放
    • 释放锁采用Redis的delete命令
    • 释放锁时校验之前设置的随机数,相同才能释放,防止释放了别人的锁
    • 释放锁使用LUA脚本
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    

    2.2 实现

    • java实现代码
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.connection.RedisStringCommands;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.script.RedisScript;
    import org.springframework.data.redis.core.types.Expiration;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.UUID;
    
    /**
    * 实现AutoCloseable接口,可用JDK1.7的try(xx)中的自动关闭特性
    */
    @Slf4j
    public class RedisLock implements AutoCloseable {
    
        private RedisTemplate redisTemplate;
        private String key;
        private String value;
        //单位:秒
        private int expireTime;
    
        public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
            this.redisTemplate = redisTemplate;
            this.key = key;
            this.expireTime=expireTime;
            this.value = UUID.randomUUID().toString();
        }
    
        /**
         * 获取分布式锁
         * @return
         */
        public boolean getLock(){
            RedisCallback<Boolean> redisCallback = connection -> {
                //设置NX
                RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
                //设置过期时间
                Expiration expiration = Expiration.seconds(expireTime);
                //序列化key
                byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
                //序列化value
                byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
                //执行setnx操作
                Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
                return result;
            };
    
            //获取分布式锁
            Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
            return lock;
        }
    
        public boolean unLock() {
            String script = "if redis.call("get",KEYS[1]) == ARGV[1] then
    " +
                    "    return redis.call("del",KEYS[1])
    " +
                    "else
    " +
                    "    return 0
    " +
                    "end";
            RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
            List<String> keys = Arrays.asList(key);
    
            Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
            log.info("释放锁的结果:"+result);
            return result;
        }
    
    
        @Override
        public void close() throws Exception {
            unLock();
        }
    }
    

    3. 使用zookeeper实现分布式锁

    zk节点

    • 持久节点
    • 瞬时节点:不可有子节点,会话结束自动消失,zk重启也消失

    3.1 原理

    观察器

    • 监听3个方法:getData, getChildren, exists
    • 节点数据变化,通知客户端
    • 只通知一次

    分布式锁原理

    • 利用zk的瞬时节点的有序性特点
    • 多线程并发创建有序瞬时节点,得到序列号
    • 序号最小获得锁
    • 其他线程监听前一个序列
    • 前一个线程执行完成,删除序列,后一个线程获得通知,可执行
    • 节点创建时,已决定了线程的执行顺序

    3.2 实现

    maven

            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.14</version>
            </dependency>
    

    获取锁与释放锁

    import lombok.extern.slf4j.Slf4j;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    
    @Slf4j
    public class ZkLock implements AutoCloseable, Watcher {
    
        private ZooKeeper zooKeeper;
        private String znode;
    
        public ZkLock() throws IOException {
            this.zooKeeper = new ZooKeeper("localhost:2181",
                    10000,this);
        }
    
        public boolean getLock(String businessCode) {
            try {
                //创建业务 根节点
                Stat stat = zooKeeper.exists("/" + businessCode, false);
                if (stat==null){
                    zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
    
                //创建瞬时有序节点  /order/order_00000001
                znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
    
                //获取业务节点下 所有的子节点
                List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
                //子节点排序
                Collections.sort(childrenNodes);
                //获取序号最小的(第一个)子节点
                String firstNode = childrenNodes.get(0);
                //如果创建的节点是第一个子节点,则获得锁
                if (znode.endsWith(firstNode)){
                    return true;
                }
                //不是第一个子节点,则监听前一个节点
                String lastNode = firstNode;
                for (String node:childrenNodes){
                    if (znode.endsWith(node)){
                    zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
                        break;
                    }else {
                        lastNode = node;
                    }
                }
                synchronized (this){
                    wait();
                }
                return true;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        @Override
        public void close() throws Exception {
            zooKeeper.delete(znode,-1);
            zooKeeper.close();
        }
    
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted){
                synchronized (this){
                    notify();
                }
            }
        }
    }
    

    4. 使用Curator客户端实现分布式锁

    资料查看 官网

    maven

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.2.0</version>
            </dependency>
    

    实现流程:

    1. 获取连接
    2. 执行start方法
    3. 获取锁
    4. 释放锁
    5. 执行close
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
    client.start();
    InterProcessMutex lock = new InterProcessMutex(client, "/order");
    try {
        if ( lock.acquire(30, TimeUnit.SECONDS) ) {
            try  {
                // 业务
            } finally  {
                lock.release();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    client.close();
    

    先声明bean,再进行使用

        /*
         * 在config中声明bean
         * 加入start与close,可在bean初始化与bean销毁时调用相应的方法
         */
        @Bean(initMethod="start",destroyMethod = "close")
        public CuratorFramework getCuratorFramework() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
            return client;
        }
        
        // 使用
        @Autowired
        private CuratorFramework client;
        
        public void test(){
            InterProcessMutex lock = new InterProcessMutex(client, "/order");
            try{
                if (lock.acquire(30, TimeUnit.SECONDS)){
                    // 业务
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    // 释放锁
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    

    5. Redisson客户端实现分布式锁

    资料 redisson官网

    • maven
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.11.2</version>
        </dependency>
    
    • 实现
            Config config = new Config();
            config.useSingleServer().setAddress("redis://ip:port");
            RedissonClient redisson = Redisson.create(config);
    
            RLock rLock = redisson.getLock("order");
    
            try {
                rLock.lock(30, TimeUnit.SECONDS);
                // 业务
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 释放锁
                rLock.unlock();
            }
    

    使用springboot的方式

    • maven
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.11.2</version>
        </dependency>
    
    spring:
      redis:
        database: 1
        host: ip
        port: 6379
        password: 123
    
    • 实现
        @Autowired
        private RedissonClient redisson;
    
    
        public void test() {
            RLock rLock = redisson.getLock("order");
            try {
                rLock.lock(30, TimeUnit.SECONDS);
                // 业务
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 解锁
                rLock.unlock();
            }
        }
    

    分布式锁实现方式比较


    方式 优点 缺点
    数据库 简单 数据库压力大
    Redis 麻烦,不支持阻塞
    Zk 支持阻塞 原理复杂
    Curator 依赖zk,强一致性
    Redisson 支持阻塞
  • 相关阅读:
    Swift # Apple Pay集成
    GitHub Top 100 简介
    一些常用算法
    CocoaPods 建立私有仓库
    安装 CocoaPods & Alcatraz
    iOS程序 # 启动过程
    Apache & WebDav 配置(二)
    SVN & Git (二)
    SVN & Git (一)
    poj 3169 Layout (差分约束)
  • 原文地址:https://www.cnblogs.com/maple92/p/14238699.html
Copyright © 2011-2022 走看看