zoukankan      html  css  js  c++  java
  • 基于redis的分布式锁

    1.      背景

    在分布式集群系统的开发中,线程锁往往并不能支持全部场景的使用,必须引入新的技术方案分布式锁。

    2.      原理

    实现分布式锁必须依靠第三方存储介质来存储锁的元数据等信息。比如分布式集群要操作某一行数据时,这个数据的流水号是唯一的,那么就把这个流水号作为一把锁的id,当某进程要操作该数据时,先去第三方存储介质中看该锁id是否存在,如果不存在,则将该锁id写入,然后执对该数据的操作。

    当其他进程要访问这个数据时,会先到第三方存储介质中查看有没有这个数据的锁id,有的话就认为这行数据目前已经有其他进程在使用了,就会不断地轮询第三方存储介质看其他进程是否释放掉该锁。

    当进程操作完该数据后,该进程就到第三方存储介质中把该锁id删除掉,这样其他轮询的进程就能得到对该锁的控制。

    3.      实现

    实现分布式锁有三种方式。

    1)数据库乐观锁

    2)基于Redis的分布式锁

    3)基于ZooKeeper的分布式锁

    本文将介绍第二种方式,基于Redis实现分布式锁。

    4.      指标

    为了确保分布式锁可用,要确保锁的实现同时满足以下四个条件。

    1)互斥性。在任意时刻,只有一个客户端能持有锁。

    2)不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

    3)具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。

    4)加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

    5.      代码

    5.1依赖

    首先要通过Maven引入Jedis开源组件,在pom.xml文件加入代码。

    <dependency>

        <groupId>redis.clients</groupId>

        <artifactId>jedis</artifactId>

        <version>2.9.0</version>

    </dependency>

    5.2加锁

    redis有个事务锁,命令如下,它的含义是将一个value设置到一个key中,如果不存在将会赋值并且设置超时时间为30秒,如何这个key已经存在了,则不进行设置。

    SET key value NX PX 30000

    这个事务锁很好的解决了两个单独的命令,一个设置set key value nx,即该key不存在的话将对其进行设置,另一个是expire key seconds,设置该key的超时时间。

    可以想一下,这两个命令用程序单独使用会存在什么问题。如果一个set key的命令设置了key,然后程序异常了,expire时间没有设置,那么这个key会一直锁住。如果一个set key时出现了异常,但expire执行正常。与此同时,另一个进程进行set key,代码才执行了一部分,key就过期了,别的线程也进入了锁,从而不能达到互斥的效果。

    @Override

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

           String lockValue = UUID.randomUUID().toString();

           Thread currentThread = Thread.currentThread();

           Jedis jedis = this.getRedisClient();

           String result = jedis.set(LOCK_KEY, lockValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, time);

           if (LOCK_SUCCESS.equals(result)) {

                  lockContex.set(lockValue);

                  exclusiveOwnerThread.set(currentThread);

                  return true;

           } else if (exclusiveOwnerThread.get() == currentThread) {//当前线程已经持有了锁,可重入

                  return true;

           } else {

                  return false;

           }

    }

    可以看到,加锁就一行代码:jedis.set(LOCK_KEY, lockValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, time),这个set()方法一共有五个形参:

    第一个为key,使用key来当锁,因为key是唯一的。

    第二个为value,传的是lockValue,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为lockValue,就知道这把锁是哪个进程加的了,在解锁的时候就可以有依据。lockValue可以使用UUID.randomUUID().toString()方法生成。

    第三个为nx,这个参数填的是NX,意思是SET IF NOT EXIST,即当key不存在时,进行set操作;若key已经存在,则不做任何操作;

    第四个为px,这个参数传的是PX,意思是要给这个key加一个过期的设置,具体时间由第五个参数决定。

    第五个为time,与第四个参数相呼应,代表key的过期时间。

    总的来说,执行上面的set()方法就只会导致两种结果。当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。已有锁存在,查看持锁的是不是当前线程,如果不是,不做任何操作。

    心细的童鞋会发现加锁代码满足可靠性里描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。

    最后,因为将value赋值为lockValue,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。由于只考虑Redis单机部署的场景,所以容错性暂不考虑。

    5.3解锁

    @Override

    public void unlock() {

           String script;

           try {

                  Jedis jedis = this.getRedisClient();

                  script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

                  if (StringUtils.isBlank(lockContex.get())) {

                         throw new RuntimeException("can not find redis key");

                  }

                  // 执行脚本,并向脚本传递参数

                  Object result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(lockContex.get()));

                  if (RELEASE_SUCCESS.equals(result)) {

                         lockContex.remove();

                  } else {

                         throw new RuntimeException("redis lock release failed");

                  }

           } catch (Exception ex) {

                  throw new RuntimeException(ex.getMessage());

           }

    }

    可以看到,解锁只需要两行代码就搞定了。第一行代码,写了一个简单的Lua脚本代码。第二行代码,将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为lockValue。eval()方法是将Lua代码交给Redis服务端执行。

    那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与lockValue相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。那么为什么执行eval()方法可以确保原子性,源于Redis的特性,下面是官网对eval命令的部分解释:

    简单来说,就是在eval命令执行Lua代码的时候,将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。

    6.      结语

    至此基于redis的分布式锁已经介绍完成,最后附上全部代码。

    import org.apache.commons.lang3.StringUtils;

    import org.tonny.properties.PropertiesUtil;

    import redis.clients.jedis.Jedis;

    import java.util.Collections;

    import java.util.UUID;

    import java.util.concurrent.TimeUnit;

    import java.util.concurrent.locks.Condition;

    import java.util.concurrent.locks.Lock;

    public class RedisDistributedLock implements Lock {

        private final static long time = 500;

        private final static String LOCK_KEY = "redis_lock_key";

        private static final String LOCK_SUCCESS = "OK";

        private static final String SET_IF_NOT_EXIST = "NX";

        private static final String SET_WITH_EXPIRE_TIME = "PX";

        private static final Long RELEASE_SUCCESS = 1L;

        /**

         * 存储唯一标识

         */

        private ThreadLocal<String> lockContex = new ThreadLocal<>();

        private ThreadLocal<Thread> exclusiveOwnerThread = new ThreadLocal<>();

        @Override

        public void lock() {

            while (!tryLock()) {

                try {

                    Thread.sleep(100);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

        @Override

        public void lockInterruptibly() throws InterruptedException {

            if (Thread.interrupted()) {

                throw new InterruptedException();

            }

            while (!tryLock()) {

                Thread.sleep(100);

            }

        }

        @Override

        public boolean tryLock() {

            try {

                return tryLock(time, TimeUnit.MILLISECONDS);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            return false;

        }

        @Override

        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

            String lockValue = UUID.randomUUID().toString();

            Thread currentThread = Thread.currentThread();

            Jedis jedis = this.getRedisClient();

            String result = jedis.set(LOCK_KEY, lockValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, time);

            jedis.close();

            if (LOCK_SUCCESS.equals(result)) {

                lockContex.set(lockValue);

                exclusiveOwnerThread.set(currentThread);

                return true;

            } else if (exclusiveOwnerThread.get() == currentThread) {//当前线程已经持有了锁,可重入

                return true;

            } else {

                return false;

            }

        }

        @Override

        public void unlock() {

            String script;

            try {

                Jedis jedis = this.getRedisClient();

                script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

                if (StringUtils.isBlank(lockContex.get())) {

                    throw new RuntimeException("can not find redis key");

                }

                // 执行脚本,并向脚本传递参数

                Object result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(lockContex.get()));

                jedis.close();

                if (RELEASE_SUCCESS.equals(result)) {

                    lockContex.remove();

                } else {

                    throw new RuntimeException("redis lock release failed");

                }

            } catch (Exception ex) {

                throw new RuntimeException(ex.getMessage());

            }

        }

        @Override

        public Condition newCondition() {

            return null;

        }

        private Jedis getRedisClient() {

            PropertiesUtil.readProperties("redis-config/redis.properties");

            String host = PropertiesUtil.getProperty("redis.host");

            String port = PropertiesUtil.getProperty("redis.port");

            return new Jedis(host, Integer.parseInt(port));

        }

        public static void main(String[] args) {

            Lock lock = new RedisDistributedLock();

            lock.lock();

            try {

                //TimeUnit.SECONDS.sleep(20);

                System.out.println("operation");

            } catch (Exception e) {

                e.printStackTrace();

            } finally {

                lock.unlock();

            }

        }

    }

  • 相关阅读:
    Python pip配置国内源
    【VLC】VLC命令行参数
    发个在owasp上演讲web应用防火墙的ppt
    Tips of Linux C programming
    linux程序调试
    scrapy结合webkit抓取js生成的页面
    Using Internet Explorer from .NET
    http长连接200万尝试及调优
    nginx url解码引发的waf漏洞
    poj 2513
  • 原文地址:https://www.cnblogs.com/supertonny/p/11305103.html
Copyright © 2011-2022 走看看