zoukankan      html  css  js  c++  java
  • 分布式锁简单入门以及三种实现方式介绍(滴滴)

    很多小伙伴在学习Java的时候,总是感觉Java多线程在实际的业务中很少使用,以至于不会花太多的时间去学习,技术债不断累积!等到了一定程度的时候对于与Java多线程相关的东西就很难理解,今天需要探讨的东西也是一样的和Java多线程相关的!做好准备,马上开车!

    学过Java多线程的应该都知道什么是锁,没学过的也不用担心,Java中的锁可以简单的理解为多线程情况下访问临界资源的一种线程同步机制。

    在学习或者使用Java的过程中进程会遇到各种各样的锁的概念:公平锁、非公平锁、自旋锁、可重入锁、偏向锁、轻量级锁、重量级锁、读写锁、互斥锁等。

    蒙了吗?不要紧!即使你这些都不会也不要紧,因为这个和今天要探讨的关系不大,不过如果你作为一个爱学习的小伙伴,这里也给你准备了一份秘籍:《Java多线程核心技术》,一共19篇祝你一臂之力!免费版的不过瘾,当然也有收费版的!

    一、为什么要使用分布式锁

    我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的Java多线程的18般武艺进行处理,并且可以完美的运行,毫无Bug!

    注意这是单机应用,也就是所有的请求都会分配到当前服务器的JVM内部,然后映射为操作系统的线程进行处理!而这个共享变量只是在这个JVM内部的一块内存空间!

    后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图:

    上图可以看到,变量A存在JVM1、JVM2、JVM3三个JVM内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象,例如:UserController控制器中的一个整形类型的成员变量),如果不加任何控制的话,变量A同时都会在JVM分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同JVM内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的!

    如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题!

    为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了很多并发处理相关的API。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

    二、分布式锁应该具备哪些条件

    在分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些条件:

    1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行; 
    2、高可用的获取锁与释放锁; 
    3、高性能的获取锁与释放锁; 
    4、具备可重入特性; 
    5、具备锁失效机制,防止死锁; 
    6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

    三、分布式锁的三种实现方式

    目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论(阿里问过这个理论,我做了总结,参考:AP原则 (阿里)

    )告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

    在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。

    基于数据库实现分布式锁; 
    基于缓存(Redis等)实现分布式锁; 
    基于Zookeeper实现分布式锁;

    尽管有这三种方案,但是不同的业务也要根据自己的情况进行选型,他们之间没有最好只有更适合!

    四、基于数据库的实现方式

    基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。

    (1)创建一个表:

    DROP TABLE IF EXISTS `method_lock`;
    CREATE TABLE `method_lock` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
      `method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
      `desc` varchar(255) NOT NULL COMMENT '备注信息',
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`),
      UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

    (2)想要执行某个方法,就使用这个方法名向表中插入数据:

    INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');

     因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

    (3)成功插入则获取锁,执行完成后删除对应的行数据释放锁:

    delete from method_lock where method_name ='methodName';

    注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法!

    使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:

    1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;

    2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;

    3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;

    4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。

    5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

    五、基于Redis的实现方式

    1、选用Redis实现分布式锁原因:

    (1)Redis有很高的性能; 
    (2)Redis命令对此支持较好,实现起来比较方便

    2、使用命令介绍:

    (1)SETNX

    SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。

    (2)expire

    expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。

    (3)delete

    delete key:删除key

    在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

    3、实现思想:

    (1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。

    (2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。

    (3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

    4、 分布式锁的简单实现代码:

    这是我在项目中用的:

     public String getToken() {
            StoreKey key = new StoreKey(KEY_CATEGORY, TOKEN_KEY_PARAMS);
            String token = redisStoreClient.get(key);
            LOGGER.info("get token :{}  from redis", token);
            if (token == null) {
                StoreKey lock = new StoreKey(KEY_CATEGORY, LOCK_KEY_PARAMS);
                //分布式锁,如果没拿到锁,则直接放弃,防止南京侧服务出现问题,影响MQ消费
                if (redisStoreClient.setnx(lock, "lock", LOCK_EXPIRE_SECONDS)) {
                    //双重检验,防止重复获取token
                    token = redisStoreClient.get(key);
                    if (token == null) {
                        try {
                            String userId = mtConfigClient.getValue(NJ_TOKEN_USERID);
                            LOGGER.info("mtConfigClient get userId : {}", userId);
                            token = HttpClientUtils.post("http://" + Conf.GET_TOKEN_URL + userId, "320100", headers);
                            LOGGER.info("get token : {} from http", token);
                            if (StringUtils.isNotBlank(token)) {
                                redisStoreClient.set(key, token, TOKEN_EXPIRE_SECONDS);
                            }
                        } catch (HttpException e) {
                            LOGGER.error("get token errer", e);
                        }
                    }
                    //将分布式锁直接过期
                    redisStoreClient.expire(lock, 0);
                }
            }
            return token;
        }
    /**
     * 分布式锁的简单实现代码
     * Created by liuyang on 2017/4/20.
     */
    public class DistributedLock {
    
        private final JedisPool jedisPool;
    
        public DistributedLock(JedisPool jedisPool) {
            this.jedisPool = jedisPool;
        }
    
        /**
         * 加锁
         * @param lockName       锁的key
         * @param acquireTimeout 获取超时时间
         * @param timeout        锁的超时时间
         * @return 锁标识
         */
        public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
            Jedis conn = null;
            String retIdentifier = null;
            try {
                // 获取连接
                conn = jedisPool.getResource();
                // 随机生成一个value
                String identifier = UUID.randomUUID().toString();
                // 锁名,即key值
                String lockKey = "lock:" + lockName;
                // 超时时间,上锁后超过此时间则自动释放锁
                int lockExpire = (int) (timeout / 1000);
    
                // 获取锁的超时时间,超过这个时间则放弃获取锁
                long end = System.currentTimeMillis() + acquireTimeout;
                while (System.currentTimeMillis() < end) {
                    if (conn.setnx(lockKey, identifier) == 1) {
                        conn.expire(lockKey, lockExpire);
                        // 返回value值,用于释放锁时间确认
                        retIdentifier = identifier;
                        return retIdentifier;
                    }
                    // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                    if (conn.ttl(lockKey) == -1) {
                        conn.expire(lockKey, lockExpire);
                    }
    
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (JedisException e) {
                e.printStackTrace();
            } finally {
                if (conn != null) {
                    conn.close();
                }
            }
            return retIdentifier;
        }
    
        /**
         * 释放锁
         * @param lockName   锁的key
         * @param identifier 释放锁的标识
         * @return
         */
        public boolean releaseLock(String lockName, String identifier) {
            Jedis conn = null;
            String lockKey = "lock:" + lockName;
            boolean retFlag = false;
            try {
                conn = jedisPool.getResource();
                while (true) {
                    // 监视lock,准备开始事务
                    conn.watch(lockKey);
                    // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
                    if (identifier.equals(conn.get(lockKey))) {
                        Transaction transaction = conn.multi();
                        transaction.del(lockKey);
                        List<Object> results = transaction.exec();
                        if (results == null) {
                            continue;
                        }
                        retFlag = true;
                    }
                    conn.unwatch();
                    break;
                }
            } catch (JedisException e) {
                e.printStackTrace();
            } finally {
                if (conn != null) {
                    conn.close();
                }
            }
            return retFlag;
        }
    }

    5、测试刚才实现的分布式锁

    例子中使用50个线程模拟秒杀一个商品,使用–运算符来实现商品减少,从结果有序性就可以看出是否为加锁状态。

    模拟秒杀服务,在其中配置了jedis线程池,在初始化的时候传给分布式锁,供其使用。

    /**
     * Created by liuyang on 2017/4/20.
     */
    public class Service {
    
        private static JedisPool pool = null;
    
        private DistributedLock lock = new DistributedLock(pool);
    
        int n = 500;
    
        static {
            JedisPoolConfig config = new JedisPoolConfig();
            // 设置最大连接数
            config.setMaxTotal(200);
            // 设置最大空闲数
            config.setMaxIdle(8);
            // 设置最大等待时间
            config.setMaxWaitMillis(1000 * 100);
            // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
            config.setTestOnBorrow(true);
            pool = new JedisPool(config, "127.0.0.1", 6379, 3000);
        }
    
        public void seckill() {
            // 返回锁的value值,供释放锁时候进行判断
            String identifier = lock.lockWithTimeout("resource", 5000, 1000);
            System.out.println(Thread.currentThread().getName() + "获得了锁");
            System.out.println(--n);
            lock.releaseLock("resource", identifier);
        }
    }

    模拟线程进行秒杀服务:

    public class ThreadA extends Thread {
        private Service service;
    
        public ThreadA(Service service) {
            this.service = service;
        }
    
        @Override
        public void run() {
            service.seckill();
        }
    }
    
    public class Test {
        public static void main(String[] args) {
            Service service = new Service();
            for (int i = 0; i < 50; i++) {
                ThreadA threadA = new ThreadA(service);
                threadA.start();
            }
        }
    }

    结果如下,结果为有序的:

    若注释掉使用锁的部分:

    public void seckill() {
        // 返回锁的value值,供释放锁时候进行判断
        //String indentifier = lock.lockWithTimeout("resource", 5000, 1000);
        System.out.println(Thread.currentThread().getName() + "获得了锁");
        System.out.println(--n);
        //lock.releaseLock("resource", indentifier);
    }

    从结果可以看出,有一些是异步进行的:

    5、基于ZooKeeper的实现方式

    ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

    (1)创建一个目录mylock; 
    (2)线程A想获取锁就在mylock目录下创建临时顺序节点; 
    (3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁; 
    (4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点; 
    (5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

    这里推荐一个Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

    优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。

    缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。

    6、总结

    上面的三种实现方式,没有在所有场合都是完美的,所以,应根据不同的应用场景选择最适合的实现方式。

    在分布式环境中,对资源进行上锁有时候是很重要的,比如抢购某一资源,这时候使用分布式锁就可以很好地控制资源。
    当然,在具体使用中,还需要考虑很多因素,比如超时时间的选取,获取锁时间的选取对并发量都有很大的影响,上述实现的分布式锁也只是一种简单的实现,主要是一种思想,以上包括文中的代码可能并不适用于正式的生产环境,只做入门参考!

    参考:分布式锁简单入门以及三种实现方式介绍

  • 相关阅读:
    1058 A+B in Hogwarts (20)
    1036. Boys vs Girls (25)
    1035 Password (20)
    1027 Colors in Mars (20)
    1009. Product of Polynomials (25)
    1006. Sign In and Sign Out
    1005 Spell It Right (20)
    1046 Shortest Distance (20)
    ViewPager页面滑动,滑动到最后一页,再往后滑动则执行一个事件
    IIS7.0上传文件限制的解决方法
  • 原文地址:https://www.cnblogs.com/aspirant/p/9200492.html
Copyright © 2011-2022 走看看