zoukankan      html  css  js  c++  java
  • java实现分布式锁

    1.前言

    大多数互联网系统是分布式部署的,分布式部署解决了高并发高可用的问题,但是由此带来了数据一致性问题。

    当某个资源在多系统之间,被共享操作的时候,为了保证这个资源数据是一致的,那么就必须要求在同一时刻只能被一个客户端操作,不能并发的执行,否者就会出现同一时刻有客户端写,别的客户端在读,两者访问到的数据就不一致了。

    2.我们为什么需要分布式锁

    在单机时代,虽然不需要分布式锁,但也面临过类似的问题,只不过在单机的情况下,如果有多个线程要同时访问某个共享资源的时候,我们可以采用线程间加锁的机制,即当某个线程获取到这个资源后,就立即对这个资源进行加锁,当使用完资源之后,再解锁,其它线程就可以接着使用了。例如,在JAVA中,甚至专门提供了一些处理锁机制的一些API(synchronize/Lock等)。

    但是到了分布式系统的时代,这种线程之间的锁机制,就没作用了,应用程序会有多份,并且部署在不同的机器上,这些资源已经不是在同一进程的不同线程间共享,而是属于多进程之间共享的资源。

    因此,为了解决这个问题,我们就必须引入「分布式锁」。

    分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。

    分布式锁要满足哪些要求呢?

    排他性:在同一时间只会有一个客户端能获取到锁,其它客户端无法获取

    避免死锁:这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放)

    高可用:获取或释放锁的机制必须高可用且性能佳

    而且最好是可重入锁。

    3.分布式锁的实现方式有哪些

    目前主流的有三种,从实现的复杂度上来看,从上往下难度依次增加:

    基于数据库实现

    基于Redis实现

    基于ZooKeeper实现

    无论哪种方式,其实都不完美,依旧要根据咱们业务的实际场景来选择。

    方案1 基于数据库实现

    基于数据库来做分布式锁的话,通常有两种做法:

    基于数据库的乐观锁

    基于数据库的悲观锁

    我们先来看一下如何基于「乐观锁」来实现:

    乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。

    当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行更新后写回数据库,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。

       乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。

    异常实现流程

    -- 可能会发生的异常情况
    -- 线程1查询,当前left_count为1,则有记录
    select * from t_bonus where id = 10001 and left_count > 0
    
    -- 线程2查询,当前left_count为1,也有记录
    select * from t_bonus where id = 10001 and left_count > 0
    
    -- 线程1完成领取记录,修改left_count为0,
    update t_bonus set left_count = left_count - 1 where id = 10001
    
    -- 线程2完成领取记录,修改left_count为-1,产生脏数据
    update t_bonus set left_count = left_count - 1 where id = 10001

    通过乐观锁实现

    -- 添加版本号控制字段
    ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;
    
    -- 线程1查询,当前left_count为1,则有记录,当前版本号为1234
    select left_count, version from t_bonus where id = 10001 and left_count > 0
    
    -- 线程2查询,当前left_count为1,有记录,当前版本号为1234
    select left_count, version from t_bonus where id = 10001 and left_count > 0
    
    -- 线程1,更新完成后当前的version为1235,update状态为1,更新成功
    update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234
    
    -- 线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理
    update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

    悲观锁」的实现:

    悲观锁利用数据库的行锁来进行锁定指定行,

    通常用"SELECT * FROM TABLE_NAME WHERE id=id_value FOR UPDATE" 来获取数据。

    如果能获取到数据,则加锁成功, 如果获取失败,说明锁已经被别的程序占用了,自己则获取锁失败。

    /**
         * 消费以后更新银行余额
         * @param bankId 银行卡号
         * @param cost 消费金额
         * @return
         */
        public boolean consume(Long bankId, Integer cost){
            //先锁定银行账户
            BankAccount product = query("SELECT * FROM bank_account WHERE bank_id=#{bankId} FOR UPDATE", bankId);
            if (product.getNumber() > 0) {
                int updateCnt = update("UPDATE tb_product_stock SET number=#{cost} WHERE product_id=#{productId}", cost, bankId);
                if(updateCnt > 0){    //更新库存成功
                    return true;
                }
            }
            return false;
        }
    方案二:基于Redis的分布式锁

    用到的部分redis指令

    SETNX命令(SET if Not eXists)
    语法:SETNX key value
    功能:原子性操作,当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
    Expire命令
    语法:expire(key, expireTime)
    功能:key设置过期时间
    GETSET命令
    语法:GETSET key value
    功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
    GET命令
    语法:GET key
    功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。
    DEL命令
    语法:DEL key [KEY …]
    功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。
    第一种:使用redis的setnx()、expire()方法,用于分布式锁
    1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
    2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
    3. 执行完业务代码后,可以通过delete命令删除key。
    这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,
    在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题

    第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题   设置了key的过期时间

    1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
    2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
    3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
    4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
    5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

    代码示例

    复制代码
    import cn.com.tpig.cache.redis.RedisService;
    import cn.com.tpig.utils.SpringUtils;
    
    /**
     * Created by IDEA
     * User: shma1664
     * Date: 2016-08-16 14:01
     * Desc: redis分布式锁
     */
    public final class RedisLockUtil {
    
        private static final int defaultExpire = 60;
    
        private RedisLockUtil() {
            //
        }
    
        /**
         * 加锁
         * @param key redis key
         * @param expire 过期时间,单位秒
         * @return true:加锁成功,false,加锁失败
         */
        public static boolean lock(String key, int expire) {
    
            RedisService redisService = SpringUtils.getBean(RedisService.class);
            long status = redisService.setnx(key, "1");
    
            if(status == 1) {
                redisService.expire(key, expire);
                return true;
            }
    
            return false;
        }
    
        public static boolean lock(String key) {
            return lock2(key, defaultExpire);
        }
    
        /**
         * 加锁
         * @param key redis key
         * @param expire 过期时间,单位秒
         * @return true:加锁成功,false,加锁失败
         */
        public static boolean lock2(String key, int expire) {
    
            RedisService redisService = SpringUtils.getBean(RedisService.class);
    
            long value = System.currentTimeMillis() + expire;
            long status = redisService.setnx(key, String.valueOf(value));
    
            if(status == 1) {
                return true;
            }
            long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
            if(oldExpireTime < System.currentTimeMillis()) {
                //超时
                long newExpireTime = System.currentTimeMillis() + expire;
                long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
                if(currentExpireTime == oldExpireTime) {
                    return true;
                }
            }
            return false;
        }
    
        public static void unLock1(String key) {
            RedisService redisService = SpringUtils.getBean(RedisService.class);
            redisService.del(key);
        }
    
        public static void unLock2(String key) {    
            RedisService redisService = SpringUtils.getBean(RedisService.class);    
            long oldExpireTime = Long.parseLong(redisService.get(key, "0"));   
            if(oldExpireTime > System.currentTimeMillis()) {        
                redisService.del(key);    
            }
       }
    
    }
    
    public void drawRedPacket(long userId) {
        String key = "draw.redpacket.userid:" + userId;
    
        boolean lock = RedisLockUtil.lock2(key, 60);
        if(lock) {
            try {
                //领取操作
            } finally {
                //释放锁
                RedisLockUtil.unLock(key);
            }
        } else {
            new RuntimeException("重复领取奖励");
        }
    }
    View Code

     第三种  使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题   设置了key的值为过期时间   生产环境验证过

    package com.differ.edibase.plugins.lock.redis;
    
    import com.differ.edibase.infrastructure.component.cache.Cacher;
    import com.differ.edibase.infrastructure.utils.SpringResolveManager;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;
    import redis.clients.jedis.Transaction;
    
    /**
     * redis实现简单的分布式锁
     *
     * @author 
     * @since 
     */
    @Component
    @Scope("prototype")
    public class RedisLock {
    
        // region 属性
    
        /**
         * 缓存
         */
        @Autowired
        protected Cacher cacher;
        /**
         * 默认等待时间
         */
        private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
        /**
         * 锁键
         */
        private String lockKey = "edi.redis.lock";
        /**
         * 锁超时时间,防止线程在入锁以后,无限的执行等待
         */
        private int expireMsecs = 60 * 1000;
        /**
         * 锁等待时间,防止线程饥饿
         */
        private int timeoutMsecs = 10 * 1000;
    
        private volatile boolean locked = false;
    
        // endregion
    
        // region 构造器
    
        /**
         * 构造器
         */
        public RedisLock() {
        }
    
        /**
         * 构造器
         *
         * @param lockKey 锁键
         */
        public RedisLock(String lockKey) {
            this.lockKey = lockKey + "_lock";
        }
    
        /**
         * 构造器
         *
         * @param lockKey      锁键
         * @param timeoutMsecs 锁等待时间
         */
        public RedisLock(String lockKey, int timeoutMsecs) {
            this(lockKey);
            this.timeoutMsecs = timeoutMsecs;
        }
    
        /**
         * 构造器
         *
         * @param lockKey      锁键
         * @param timeoutMsecs 锁等待时间
         * @param expireMsecs  锁超时时间
         */
        public RedisLock(String lockKey, int timeoutMsecs, int expireMsecs) {
            this(lockKey, timeoutMsecs);
            this.expireMsecs = expireMsecs;
        }
    
        // endregion
    
        // region 锁具体方法
    
        /**
         * 获得 lock.
         * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
         * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
         * 执行过程:
         * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
         * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
         *
         * @return true if lock is acquired, false acquire timeouted
         * @throws InterruptedException in case of thread interruption
         */
        public synchronized boolean lock() throws InterruptedException {
            int timeout = timeoutMsecs;
            while (timeout > 0) {
                long expires = System.currentTimeMillis() + expireMsecs + 1;
                // 锁到期时间
                String expiresStr = String.valueOf(expires);
                if (this.cacher.setNx(this.lockKey, expiresStr) == 1) {
                    this.locked = true;
                    return true;
                }
                // redis里的时间
                String currentValueStr = this.cacher.get(this.lockKey);
                // 判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                    // 获取上一个锁到期时间,并设置现在的锁到期时间,只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                    String oldValueStr = this.cacher.getSet(this.lockKey, expiresStr);
                    // 防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
                    // [分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                        locked = true;
                        return true;
                    }
                }
                timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
                Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
            }
            return false;
        }
    
        /**
         * Acqurired lock release.
         */
        public synchronized void unlock() {
            if (this.locked) {
                this.cacher.delete(this.lockKey);
                this.locked = false;
            }
        }
    
        // endregion
    
        // region 获取RedisLock
    
        /**
         * 获取RedisLock
         *
         * @param lockKey 锁键
         * @return RedisLock
         */
        public static RedisLock get(String lockKey) {
            Object[] obj = new Object[] { lockKey };
            return SpringResolveManager.resolve(RedisLock.class, obj);
        }
    
        /**
         * 获取RedisLock
         *
         * @param lockKey      锁键
         * @param timeoutMsecs 锁等待时间
         * @return RedisLock
         */
        public static RedisLock get(String lockKey, int timeoutMsecs) {
            Object[] obj = new Object[] { lockKey, timeoutMsecs };
            return SpringResolveManager.resolve(RedisLock.class, obj);
        }
    
        /**
         * 获取RedisLock
         *
         * @param lockKey      锁键
         * @param timeoutMsecs 锁等待时间
         * @param expireMsecs  锁超时时间
         * @return RedisLock
         */
        public static RedisLock get(String lockKey, int timeoutMsecs, int expireMsecs) {
            Object[] obj = new Object[] { lockKey, timeoutMsecs, expireMsecs };
            return SpringResolveManager.resolve(RedisLock.class, obj);
        }
    
        // endregion
    
    }
    View Code

    具体使用

            // 加锁获取缓存
            RedisLock redisLock = RedisLock.get(this.key);
            try {
                if (redisLock.lock()) {
                         //做自己的业务
                    }
                }
            } catch (Exception ex) {
                 //记录异常日志
            } finally {
                 //释放锁
                redisLock.unlock();
            }
    View Code

    方案三 :基于Zookeeper的分布式锁

    利用节点名称的唯一性来实现独占锁

        ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。

    ZK具体实现分布式锁,可以看

    https://www.cnblogs.com/lijiasnong/p/9952494.html

  • 相关阅读:
    VS2010/MFC编程入门之十四(对话框:向导对话框的创建及显示)
    VS2010/MFC编程入门之十三(对话框:属性页对话框及相关类的介绍)
    Tomcat架构解析(四)-----Coyote、HTTP、AJP、HTTP2等协议
    Tomcat架构解析(三)-----Engine、host、context解析以及web应用加载
    Tomcat架构解析(二)-----Connector、Tomcat启动过程以及Server的创建过程
    Tomcat架构解析(一)-----Tomcat总体架构
    springboot深入学习(四)-----spring data、事务
    springboot深入学习(三)-----tomcat配置、websocket
    springboot深入学习(二)-----profile配置、运行原理、web开发
    springboot深入学习(一)-----springboot核心、配置文件加载、日志配置
  • 原文地址:https://www.cnblogs.com/hup666/p/13442122.html
Copyright © 2011-2022 走看看