zoukankan      html  css  js  c++  java
  • Redis实现分布式锁(设计模式应用实战)

    笔者看过网络上各种各样使用redis实现分布式锁的代码,要么错误,要么片段化,没有一个完整的例子,借这个周末给大家总结一下redis实现分布式锁的两种机制

    自旋锁和排他锁

    鉴于实现锁的方式不同,那么这里使用策略模式来组织代码

    一、自旋锁

    分布式锁抽象策略接口

    package com.srr.lock;
    
    /**
     * @Description 分布式锁的接口
     */
    abstract  public interface DistributedLock {
        /**
         * 获取锁
         */
        boolean lock();
        /**
         * 解锁
         */
        void unlock();
    }

    自旋锁策略抽象类,使用模板方法模式构建

    package com.srr.lock;
    
    /**
     * 自旋锁策略模板
     */
    public abstract class SpinRedisLockStrategy implements DistributedLock {
    
        private static final Integer retry = 50; //默认重试5次
        private static final Long sleeptime = 100L;
        protected String lockKey;
        protected String requestId;
        protected int expireTime;
    
        private SpinRedisLockStrategy(){}
        public SpinRedisLockStrategy(String lockKey, String requestId, int expireTime){
            this.lockKey=lockKey;
            this.requestId=requestId;
            this.expireTime=expireTime;
        }
        /**
         * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
         */
        @Override
        public boolean lock() {
            Boolean flag = false;
            try {
                for (int i=0;i<retry;i++){
                    flag = tryLock();
                    if(flag){
                        System.out.println(Thread.currentThread().getName()+"获取锁成功");
                        break;
                    }
                    Thread.sleep(sleeptime);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
             return flag;
        }
        /**
         * 尝试获取锁,子类实现
         */
        protected abstract boolean tryLock() ;
    
        /**
         * 解锁:删除key
         */
        @Override
        public  abstract void unlock();
    }

    自旋锁实现子类

    package com.srr.lock;
    
    import redis.clients.jedis.Jedis;
    
    import java.util.Collections;
    
    /**
     * 自旋锁
     */
    public class SpinRedisLock extends SpinRedisLockStrategy{
    
        private static final Long RELEASE_SUCCESS = 1L;
        private static final String LOCK_SUCCESS = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";
        private static final String SET_WITH_EXPIRE_TIME = "PX";
    
        public SpinRedisLock(String lockKey, String requestId, int expireTime) {
            super(lockKey,requestId, expireTime);
        }
    
        @Override
        protected boolean tryLock() {
            Jedis jedis = new Jedis("localhost", 6379);  //创建客户端,1p和端口号
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }
    
        @Override
        public void unlock() {
            Jedis jedis = new Jedis("localhost", 6379);  //创建客户端,1p和端口号
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                System.out.println("lock is unlock");
            }
        }
    }

    至此,自旋锁方式实现分布式锁就完成了,下面来看排他锁阻塞的方式实现

    二、排他锁

      在实现之前需要大家搞懂一个概念,也就是redis的事件通知:

    /**
    * 键空间通知,所有通知以 keyspace@ 为前缀
    * 键事件通知,所有通知以 keyevent@ 为前缀
    * 所有命令都只在键真的被改动了之后,才会产生通知,比如删除foo会产生
    * 键空间通知
    * “pmessage”,"__ key*__ : * “,”__ keyspace@0__:foo",“set”
    * 和键事件通知
    * “pmessage”,"__ key*__ : *","__ keyevent@0__:set",“foo”
    */

       搞懂概念之后,需要在redis的配置文件redis.conf中将其 notify-keyspace-events "KEA",默认为notify-keyspace-events "",这样才能启动redis的事件监听机制。

       排它锁策略抽象类

       

    package com.srr.lock;
    
    import redis.clients.jedis.Jedis;
    
    /**
     * @Description  阻塞获取锁,模板类
     */
    public abstract class BlockingRedisLockStrategy implements DistributedLock {
    
        protected String lockKey;
        protected String requestId;
        protected int expireTime;
    
        private BlockingRedisLockStrategy(){}
        public BlockingRedisLockStrategy(String lockKey, String requestId,int expireTime){
            this.lockKey=lockKey;
            this.requestId=requestId;
            this.expireTime=expireTime;
        }
        /**
         * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
         * @throws Exception
         */
        @Override
        public final boolean lock() {
            //获取锁成功
            if (tryLock()){
                System.out.println(Thread.currentThread().getName()+"获取锁成功");
                return true;
            }else{  //获取锁失败
                //阻塞一直等待
                waitLock();
                //递归,再次获取锁
                return lock();
            }
        }
        /**
         * 尝试获取锁,子类实现
         */
        protected abstract boolean tryLock() ;
        /**
         * 等待获取锁,子类实现
         */
        protected abstract void waitLock();
        /**
         * 解锁:删除key
         */
        @Override
        public  abstract void unlock();
    }

    排他锁实现子类

    package com.srr.lock;
    
    import redis.clients.jedis.Jedis;
    
    import java.util.Collections;
    
    /**
     * 排他锁,阻塞
     */
    public class BlockingRedisLock extends BlockingRedisLockStrategy {
    
        private static final Long RELEASE_SUCCESS = 1L;
        private static final String LOCK_SUCCESS = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";
        private static final String SET_WITH_EXPIRE_TIME = "PX";
    
        public BlockingRedisLock(String lockKey, String requestId, int expireTime) {
            super(lockKey,requestId, expireTime);
        }
    
    
        /**
         * 尝试获取分布式锁
         * @return 是否获取成功
         */
        @Override
        public boolean tryLock() {
            Jedis jedis = new Jedis("localhost", 6379);  //创建客户端,1p和端口号
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }
    
        @Override
        public void waitLock() {
            //判断key是否存在
            Jedis jedis = new Jedis("localhost", 6379);  //创建客户端,1p和端口号
            KeyExpiredListener keyExpiredListener = new KeyExpiredListener();
            /**
             * 键空间通知,所有通知以 keyspace@ 为前缀
             * 键事件通知,所有通知以 keyevent@ 为前缀
             * 所有命令都只在键真的被改动了之后,才会产生通知,比如删除foo会产生
             * 键空间通知
             * “pmessage”,"__ key*__ : * “,”__ keyspace@0__:foo",“set”
             * 和键事件通知
             * “pmessage”,"__ key*__ : *","__ keyevent@0__:set",“foo”
             */
            //如果要监听某个key的执行了什么操作,就订阅__ keyspace@0__,监听某种操作动了哪些key,就订阅__ keyevent@0__
            //这里我们需要监听分布式锁的键被删除了,所以要监听删除动作"__keyspace@0__:"+key
            jedis.psubscribe(keyExpiredListener, "__keyspace@0__:"+lockKey);
            System.out.println("over");
        }
    
        /**
         * 释放分布式锁
         * @return 是否释放成功
         */
        @Override
        public void unlock() {
            Jedis jedis = new Jedis("localhost", 6379);  //创建客户端,1p和端口号
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                System.out.println("lock is unlock");
            }
        }
    }

    redis事件监听类

    package com.srr.lock;
    
    
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * redis 事件监听器
     */
    public class KeyDelListener extends JedisPubSub {
        public KeyDelListener(){
    
        }
        // 初始化订阅时候的处理
        @Override
        public void onPSubscribe(String pattern, int subscribedChannels) {
        }
    
        // 取得订阅的消息后的处理
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            System.out.println("message == "+message);
            this.punsubscribe();
            System.out.println("unsubscribe == "+message);
        }
    }

    到这里排他锁的完整代码就写完了,其实对比一下,两者的区别在于lock的实现方式不同,笔者为了确保代码完整性就全部贴上了。

    代码写完了那么给一个场景测试一下我们的代码有没有问题,请看下面的测试代码:

    这里我们构建一个Lock工具类:

    package com.srr.lock;
    
    /**
     * 锁工具类
     */
    public class Lock {
        /**
         * 获取锁
         */
        boolean lock(DistributedLock lock) {
            return lock.lock();
        };
    
        /**
         * 释放锁
         */
        void unlock(DistributedLock lock) {
            lock.unlock();
        };
    }

    测试类:

    package com.srr.lock;
    
    import redis.clients.jedis.Jedis;
    
    /**
     *  测试场景
     *  count从1加到101
     *  使用redis分布式锁在分布式环境下保证结果正确
     */
    public class T {
    
        volatile int  count = 1;
    
        public void inc(){
            for(int i = 0;i<100;i++){
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                count++;
                System.out.println("count == "+count);
            }
        }
    
        public int getCount(){
           return count;
        }
    
        public static void main(String[] args) {
            final T t = new T();
            final Lock lock = new Lock();
            //final RedisLock redisLock = new BlockingRedisLock("","1",100000,jedis);
            final DistributedLock distributedLock = new SpinRedisLock("test","1",100000);
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    if(lock.lock(distributedLock)){
                        t.inc();
                        System.out.println("t1 running");
                        System.out.println("t1 == count == "+ t.getCount());
                        lock.unlock(distributedLock);
                    }
    
                }
            });
    
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    if(lock.lock(distributedLock)) {
                        t.inc();
                        System.out.println("t2 running");
                        System.out.println("t2 == count == " + t.getCount());
                        lock.unlock(distributedLock);
                    }
    
                }
            });
    
            t1.start();
            t2.start();
        }
    }

    测试结果:

     到这里,全部代码就完成了,如果想使用zookeeper实现分布式锁只需要抽象出一个策略类实现DistributedLock接口即可。是不是很方便呢。

     原创不易,多多关注!

  • 相关阅读:
    深度学习分类网络的发展历史
    杨辉三角
    【了解】贝塞尔曲线
    win10桌面点击事件蓝色边框处理
    try{}catch的隐藏(如何优雅的实现异常块)
    switch的一些思考(seitch与ifelse的区别)
    好看的控制台日志线
    Serializable和Externalizabl的异同
    java排序方式对比
    如何初始化Map,java
  • 原文地址:https://www.cnblogs.com/sx-bj-srr/p/distributedLock.html
Copyright © 2011-2022 走看看