zoukankan      html  css  js  c++  java
  • Java分布式锁三种实现方案

    方案一:数据库乐观锁

    乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。

    异常实现流程
    -- 可能会发生的异常情况
    -- 线程1查询,当前left_count为1,则有记录
    select * from t_bonus where id = 10001 and left_count > 0
    
    -- 线程2查询,当前left_count为1,也有记录
    select * from t_bonus where id = 10001 and left_count > 0
    
    -- 线程1完成领取记录,修改left_count为0,
    update t_bonus set left_count = left_count - 1 where id = 10001
    
    -- 线程2完成领取记录,修改left_count为-1,产生脏数据
    update t_bonus set left_count = left_count - 1 where id = 10001
    通过乐观锁实现
    -- 添加版本号控制字段
    ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;
    
    -- 线程1查询,当前left_count为1,则有记录,当前版本号为1234
    select left_count, version from t_bonus where id = 10001 and left_count > 0
    
    -- 线程2查询,当前left_count为1,有记录,当前版本号为1234
    select left_count, version from t_bonus where id = 10001 and left_count > 0
    
    -- 线程1,更新完成后当前的version为1235,update状态为1,更新成功
    update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234
    
    -- 线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理
    update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234
    方案二:基于Redis的分布式锁

    SETNX命令(SET if Not eXists)
    语法:SETNX key value
    功能:原子性操作,当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
    Expire命令
    语法:expire(key, expireTime)
    功能:key设置过期时间
    GETSET命令
    语法:GETSET key value
    功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
    GET命令
    语法:GET key
    功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。
    DEL命令
    语法:DEL key [KEY …]
    功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。

    第一种:使用redis的setnx()、expire()方法,用于分布式锁
    1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
    2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
    3. 执行完业务代码后,可以通过delete命令删除key。

    这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题

    第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题
    1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
    2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
    3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
    4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
    5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
    import cn.com.tpig.cache.redis.RedisService;
    import cn.com.tpig.utils.SpringUtils;
    
    /**
     * Created by IDEA
     * User: shma1664
     * Date: 2016-08-16 14:01
     * Desc: redis分布式锁
     */
    public final class RedisLockUtil {
    
        private static final int defaultExpire = 60;
    
        private RedisLockUtil() {
            //
        }
    
        /**
         * 加锁
         * @param key redis key
         * @param expire 过期时间,单位秒
         * @return true:加锁成功,false,加锁失败
         */
        public static boolean lock(String key, int expire) {
    
            RedisService redisService = SpringUtils.getBean(RedisService.class);
            long status = redisService.setnx(key, "1");
    
            if(status == 1) {
                redisService.expire(key, expire);
                return true;
            }
    
            return false;
        }
    
        public static boolean lock(String key) {
            return lock2(key, defaultExpire);
        }
    
        /**
         * 加锁
         * @param key redis key
         * @param expire 过期时间,单位秒
         * @return true:加锁成功,false,加锁失败
         */
        public static boolean lock2(String key, int expire) {
    
            RedisService redisService = SpringUtils.getBean(RedisService.class);
    
            long value = System.currentTimeMillis() + expire;
            long status = redisService.setnx(key, String.valueOf(value));
    
            if(status == 1) {
                return true;
            }
            long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
            if(oldExpireTime < System.currentTimeMillis()) {
                //超时
                long newExpireTime = System.currentTimeMillis() + expire;
                long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
                if(currentExpireTime == oldExpireTime) {
                    return true;
                }
            }
            return false;
        }
    
        public static void unLock1(String key) {
            RedisService redisService = SpringUtils.getBean(RedisService.class);
            redisService.del(key);
        }
    
        public static void unLock2(String key) {    
            RedisService redisService = SpringUtils.getBean(RedisService.class);    
            long oldExpireTime = Long.parseLong(redisService.get(key, "0"));   
            if(oldExpireTime > System.currentTimeMillis()) {        
                redisService.del(key);    
            }
       }
    
    }
    
    public void drawRedPacket(long userId) {
        String key = "draw.redpacket.userid:" + userId;
    
        boolean lock = RedisLockUtil.lock2(key, 60);
        if(lock) {
            try {
                //领取操作
            } finally {
                //释放锁
                RedisLockUtil.unLock(key);
            }
        } else {
            new RuntimeException("重复领取奖励");
        }
    }
    Spring AOP基于注解方式和SpEL实现开箱即用的redis分布式锁策略
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * RUNTIME
     * 定义注解
     * 编译器将把注释记录在类文件中,在运行时 VM 将保留注释,因此可以反射性地读取。
     * @author shma1664
     *
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface RedisLockable {
    
    
        String[] key() default "";
    
        long expiration() default 60;
    }
    import javax.annotation.Resource;
    
    import java.lang.reflect.Method;
    
    import com.autohome.api.dealer.util.cache.RedisClient;
    import com.google.common.base.Joiner;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.Signature;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.springframework.expression.EvaluationContext;
    import org.springframework.expression.Expression;
    import org.springframework.expression.ExpressionParser;
    import org.springframework.expression.spel.standard.SpelExpressionParser;
    import org.springframework.expression.spel.support.StandardEvaluationContext;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by IDEA
     * User: mashaohua
     * Date: 2016-09-28 18:08
     * Desc:
     */
    @Aspect
    @Component
    public class RedisLockAop {
    
        @Resource
        private RedisClient redisClient;
    
        @Pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))")
        public void pointcut(){}
    
        @Around("pointcut()")
        public Object doAround(ProceedingJoinPoint point) throws Throwable{
            Signature signature = point.getSignature();
            MethodSignature methodSignature = (MethodSignature) signature;
            Method method = methodSignature.getMethod();
            String targetName = point.getTarget().getClass().getName();
            String methodName = point.getSignature().getName();
            Object[] arguments = point.getArgs();
    
            if (method != null && method.isAnnotationPresent(RedisLockable.class)) {
                RedisLockable redisLock = method.getAnnotation(RedisLockable.class);
                long expire = redisLock.expiration();
                String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments);
                boolean isLock = RedisLockUtil.lock2(redisKey, expire);
                if(!isLock) {
                    try {
                        return point.proceed();
                    } finally {
                        unLock2(redisKey);
                    }
                } else {
                    throw new RuntimeException("您的操作太频繁,请稍后再试");
                }
            }
    
            return point.proceed();
        }
    
        private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) {
    
            StringBuilder sb = new StringBuilder();
            sb.append("lock.").append(targetName).append(".").append(methodName);
    
            if(keys != null) {
                String keyStr = Joiner.on(".").skipNulls().join(keys);
                String[] parameters = ReflectParamNames.getNames(targetName, methodName);
                ExpressionParser parser = new SpelExpressionParser();
                Expression expression = parser.parseExpression(keyStr);
                EvaluationContext context = new StandardEvaluationContext();
                int length = parameters.length;
                if (length > 0) {
                    for (int i = 0; i < length; i++) {
                        context.setVariable(parameters[i], arguments[i]);
                    }
                }
                String keysValue = expression.getValue(context, String.class);
                sb.append("#").append(keysValue);
            }
            return sb.toString();
        }
    <!-- https://mvnrepository.com/artifact/javassist/javassist -->
    <dependency>
        <groupId>org.javassist</groupId>
        <artifactId>javassist</artifactId>
        <version>3.18.1-GA</version>
    </dependency>
    import javassist.*;
    import javassist.bytecode.CodeAttribute;
    import javassist.bytecode.LocalVariableAttribute;
    import javassist.bytecode.MethodInfo;
    import org.apache.log4j.Logger;
    
    /**
     * Created by IDEA
     * User: mashaohua
     * Date: 2016-09-28 18:39
     * Desc:
     */
    public class ReflectParamNames {
        private static Logger log = Logger.getLogger(ReflectParamNames.class);
        private  static ClassPool pool = ClassPool.getDefault();
    
        static{
            ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class);
            pool.insertClassPath(classPath);
        }
    
        public static String[] getNames(String className,String methodName) {
            CtClass cc = null;
            try {
                cc = pool.get(className);
                CtMethod cm = cc.getDeclaredMethod(methodName);
                // 使用javaassist的反射方法获取方法的参数名
                MethodInfo methodInfo = cm.getMethodInfo();
                CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
                LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);
                if (attr == null) return new String[0];
    
                int begin = 0;
    
                String[] paramNames = new String[cm.getParameterTypes().length];
                int count = 0;
                int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;
    
                for (int i = 0; i < attr.tableLength(); i++){
                    //  为什么 加这个判断,发现在windows 跟linux执行时,参数顺序不一致,通过观察,实际的参数是从this后面开始的
                    if (attr.variableName(i).equals("this")){
                        begin = i;
                        break;
                    }
                }
    
                for (int i = begin+1; i <= begin+paramNames.length; i++){
                    paramNames[count] = attr.variableName(i);
                    count++;
                }
                return paramNames;
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                try {
                    if(cc != null) cc.detach();
                } catch (Exception e2) {
                    log.error(e2.getMessage());
                }
    
    
            }
            return new String[0];
        }
    }
    在需要使用分布式锁的地方添加注解
    /**
     * 抽奖接口
     * 添加redis分布式锁保证一个订单只有一个请求处理,防止用户刷礼物,支持SpEL表达式
     * redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId
     * @param orderId 订单id
     * @return 抽中的奖品信息
     */
    @RedisLockable(key = {"#orderId"}, expiration = 120)
    @Override
    public BonusConvertBean drawBonus(Integer orderId) throws BonusException{
        // 业务逻辑
    }
    第三种方案:基于Zookeeper的分布式锁
    利用节点名称的唯一性来实现独占锁

    ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。

    package com.shma.example.zookeeper.lock;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * Created by IDEA
     * User: mashaohua
     * Date: 2016-09-30 16:09
     * Desc:
     */
    public class ZookeeperLock implements Lock, Watcher {
        private ZooKeeper zk;
        private String root = "/locks";//
        private String lockName;//竞争资源的标志
        private String myZnode;//当前锁
        private int sessionTimeout = 30000;
        private List<Exception> exception = new ArrayList<Exception>();
    
        /**
         * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
         * @param config 127.0.0.1:2181
         * @param lockName 竞争资源标志,lockName中不能包含单词lock
         */
        public ZookeeperLock(String config, String lockName){
            this.lockName = lockName;
            // 创建一个与服务器的连接
            try {
                zk = new ZooKeeper(config, sessionTimeout, this);
                Stat stat = zk.exists(root, false);
                if(stat == null){
                    // 创建根节点
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (IOException e) {
                exception.add(e);
            } catch (KeeperException e) {
                exception.add(e);
            } catch (InterruptedException e) {
                exception.add(e);
            }
        }
    
        @Override
        public void lock() {
            if(exception.size() > 0){
                throw new LockException(exception.get(0));
            }
            if(!tryLock()) {
                throw new LockException("您的操作太频繁,请稍后再试");
            }
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            this.lock();
        }
    
        @Override
        public boolean tryLock() {
            try {
                myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return true;
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return tryLock();
        }
    
        @Override
        public void unlock() {
            try {
                zk.delete(myZnode, -1);
                myZnode = null;
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            //
        }
    
    }
    
    ZookeeperLock lock = null;
    try {
        lock = new ZookeeperLock("127.0.0.1:2182","test1");
        lock.lock();
        //业务逻辑处理
    } catch (LockException e) {
        throw e;
    } finally {
        if(lock != null)
            lock.unlock();
    }
    利用临时顺序节点控制时序实现

    /lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。
    算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。
    对于解锁操作,只需要将自身创建的节点删除即可。

    package com.shma.example.zookeeper.lock;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * Created by IDEA
     * User: mashaohua
     * Date: 2016-09-30 16:09
     * Desc:
     */
    public class DistributedLock implements Lock, Watcher{
        private ZooKeeper zk;
        private String root = "/locks";//
        private String lockName;//竞争资源的标志
        private String waitNode;//等待前一个锁
        private String myZnode;//当前锁
        private CountDownLatch latch;//计数器
        private int sessionTimeout = 30000;
        private List<Exception> exception = new ArrayList<Exception>();
    
        /**
         * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
         * @param config 127.0.0.1:2181
         * @param lockName 竞争资源标志,lockName中不能包含单词lock
         */
        public DistributedLock(String config, String lockName){
            this.lockName = lockName;
            // 创建一个与服务器的连接
            try {
                zk = new ZooKeeper(config, sessionTimeout, this);
                Stat stat = zk.exists(root, false);
                if(stat == null){
                    // 创建根节点
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            } catch (IOException e) {
                exception.add(e);
            } catch (KeeperException e) {
                exception.add(e);
            } catch (InterruptedException e) {
                exception.add(e);
            }
        }
    
        /**
         * zookeeper节点的监视器
         */
        public void process(WatchedEvent event) {
            if(this.latch != null) {
                this.latch.countDown();
            }
        }
    
        public void lock() {
            if(exception.size() > 0){
                throw new LockException(exception.get(0));
            }
            try {
                if(this.tryLock()){
                    System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                    return;
                }
                else{
                    waitForLock(waitNode, sessionTimeout);//等待锁
                }
            } catch (KeeperException e) {
                throw new LockException(e);
            } catch (InterruptedException e) {
                throw new LockException(e);
            }
        }
    
        public boolean tryLock() {
            try {
                String splitStr = "_lock_";
                if(lockName.contains(splitStr))
                    throw new LockException("lockName can not contains \u000B");
                //创建临时子节点
                myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(myZnode + " is created ");
                //取出所有子节点
                List<String> subNodes = zk.getChildren(root, false);
                //取出所有lockName的锁
                List<String> lockObjNodes = new ArrayList<String>();
                for (String node : subNodes) {
                    String _node = node.split(splitStr)[0];
                    if(_node.equals(lockName)){
                        lockObjNodes.add(node);
                    }
                }
                Collections.sort(lockObjNodes);
                System.out.println(myZnode + "==" + lockObjNodes.get(0));
                if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                    //如果是最小的节点,则表示取得锁
                    return true;
                }
                //如果不是最小的节点,找到比自己小1的节点
                String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
                waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
            } catch (KeeperException e) {
                throw new LockException(e);
            } catch (InterruptedException e) {
                throw new LockException(e);
            }
            return false;
        }
    
        public boolean tryLock(long time, TimeUnit unit) {
            try {
                if(this.tryLock()){
                    return true;
                }
                return waitForLock(waitNode,time);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
            Stat stat = zk.exists(root + "/" + lower,true);
            //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
            if(stat != null){
                System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
                this.latch = new CountDownLatch(1);
                this.latch.await(waitTime, TimeUnit.MILLISECONDS);
                this.latch = null;
            }
            return true;
        }
    
        public void unlock() {
            try {
                System.out.println("unlock " + myZnode);
                zk.delete(myZnode,-1);
                myZnode = null;
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        public void lockInterruptibly() throws InterruptedException {
            this.lock();
        }
    
        public Condition newCondition() {
            return null;
        }
    
        public class LockException extends RuntimeException {
            private static final long serialVersionUID = 1L;
            public LockException(String e){
                super(e);
            }
            public LockException(Exception e){
                super(e);
            }
        }
    
    }

  • 相关阅读:
    Android开发中常见的错误
    使用命令行的方式向GitHub中上传本地项目
    jmeter保存测试结果到文件
    转 Jmeter参数化的4种方法
    键盘各键对应的编码值(key)
    CacheHelper对缓存的控制
    Oracle存储过程
    Oricle中SQL语法
    python学习资料百度网盘分享
    一些网站学习的链接
  • 原文地址:https://www.cnblogs.com/lijiasnong/p/9952494.html
Copyright © 2011-2022 走看看