zoukankan      html  css  js  c++  java
  • 终极锁实战:单JVM锁+分布式锁

    1.前言

    锁就像一把钥匙,需要加锁的代码就像一个房间。出现互斥操作的典型场景:多人同时想进同一个房间争抢这个房间的钥匙(只有一把),一人抢到钥匙,其他人都等待这个人出来归还钥匙,此时大家再次争抢钥匙循环下去。

    作为终极实战系列,本篇用java语言分析锁的原理(源码剖析)和应用(详细代码),根据锁的作用范围分为:JVM锁和分布式锁。如理解有误之处,还请指出。

    2.单JVM锁(进程级别)

    程序部署在一台服务器上,当容器启动时(例如tomcat),一台JVM就运行起来了。本节分析的锁均只能在单JVM下生效。因为最终锁定的是某个对象,这个对象生存在JVM中,自然锁只能锁单JVM。这一点很重要。如果你的服务只部署一个实例,那么恭喜你,用以下几种锁就可以了。

    1.synchronized同步锁

    2.ReentrantLock重入锁

    3.ReadWriteLock读写锁

    4.StampedLock戳锁

    由于之前已经详细分析过原理+使用,各位直接坐飞机吧:同步中的四种锁synchronized、ReentrantLock、ReadWriteLock、StampedLock

    3.分布式锁(多服务节点,多进程)

    3.1基于数据库锁实现

    场景举例:

    卖商品,先查询库存>0,更新库存-1。

     1.悲观锁:select for update(一致性锁定读)


    查询官方文档如上图,事务内起作用的行锁。能够保证当前session事务所锁定的行不会被其他session所修改(这里的修改指更新或者删除)。对读取的记录加X锁,即排它锁,其他事不能对上锁的行加任何锁。

    BEGIN;(确保以下2步骤在一个事务中:)
    SELECT * FROM tb_product_stock WHERE product_id=1 FOR UPDATE--->product_id有索引,锁行.加锁(注:条件字段必须有索引才能锁行,否则锁表,且最好用explain查看一下是否使用了索引,因为有一些会被优化掉最终没有使用索引
    UPDATE tb_product_stock SET number=number-1 WHERE product_id=1--->更新库存-1.解锁
    COMMIT;

     2.乐观锁:版本控制,选一个字段作为版本控制字段,更新前查询一次,更新时该字段作为更新条件不同业务场景,版本控制字段,可以0 1控制,也可以+1控制,也可以-1控制,这个随意。

    BEGIN;(确保以下2步骤在一个事务中:)
    SELECT number FROM tb_product_stock WHERE product_id=1--》查询库存总数,不加锁
    UPDATE tb_product_stock SET number=number-1 WHERE product_id=1 AND number=第一步查询到的库存数--》number字段作为版本控制字段
    COMMIT; 

    3.2基于缓存实现(redis,memcached)

    原理:

    redisson开源jar包,提供了很多功能,其中就包含分布式锁。是Redis官方推荐的顶级项目,官网飞机票

    核心org.redisson.api.RLock接口封装了分布式锁的获取和释放。源码如下:

     1 @Override
     2     public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
     3         long time = unit.toMillis(waitTime);
     4         long current = System.currentTimeMillis();
     5         final long threadId = Thread.currentThread().getId();
     6         Long ttl = tryAcquire(leaseTime, unit, threadId);//申请锁,返回还剩余的锁过期时间
     7         // lock acquired
     8         if (ttl == null) {
     9             return true;
    10         }
    11         
    12         time -= (System.currentTimeMillis() - current);
    13         if (time <= 0) {
    14             acquireFailed(threadId);
    15             return false;
    16         }
    17         
    18         current = System.currentTimeMillis();
    19         final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    20         if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
    21             if (!subscribeFuture.cancel(false)) {
    22                 subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
    23                     @Override
    24                     public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
    25                         if (subscribeFuture.isSuccess()) {
    26                             unsubscribe(subscribeFuture, threadId);
    27                         }
    28                     }
    29                 });
    30             }
    31             acquireFailed(threadId);
    32             return false;
    33         }
    34 
    35         try {
    36             time -= (System.currentTimeMillis() - current);
    37             if (time <= 0) {
    38                 acquireFailed(threadId);
    39                 return false;
    40             }
    41         
    42             while (true) {
    43                 long currentTime = System.currentTimeMillis();
    44                 ttl = tryAcquire(leaseTime, unit, threadId);
    45                 // lock acquired
    46                 if (ttl == null) {
    47                     return true;
    48                 }
    49 
    50                 time -= (System.currentTimeMillis() - currentTime);
    51                 if (time <= 0) {
    52                     acquireFailed(threadId);
    53                     return false;
    54                 }
    55 
    56                 // waiting for message
    57                 currentTime = System.currentTimeMillis();
    58                 if (ttl >= 0 && ttl < time) {
    59                     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    60                 } else {
    61                     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    62                 }
    63 
    64                 time -= (System.currentTimeMillis() - currentTime);
    65                 if (time <= 0) {
    66                     acquireFailed(threadId);
    67                     return false;
    68                 }
    69             }
    70         } finally {
    71             unsubscribe(subscribeFuture, threadId);
    72         }
    73 //        return get(tryLockAsync(waitTime, leaseTime, unit));
    74     }

    上述方法,调用加锁的逻辑就是在tryAcquire(leaseTime, unit, threadId)中,如下图:

    1 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    2     return get(tryAcquireAsync(leaseTime, unit, threadId));//tryAcquireAsync返回RFutrue
    3 }
    tryAcquireAsync中commandExecutor.evalWriteAsync就是咱们加锁核心方法了
     1 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
     2         internalLockLeaseTime = unit.toMillis(leaseTime);
     3 
     4         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
     5                   "if (redis.call('exists', KEYS[1]) == 0) then " +
     6                       "redis.call('hset', KEYS[1], ARGV[2], 1); " +
     7                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     8                       "return nil; " +
     9                   "end; " +
    10                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    11                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    12                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    13                       "return nil; " +
    14                   "end; " +
    15                   "return redis.call('pttl', KEYS[1]);",
    16                     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    17     }

    如上图,已经到了redis命令了

    加锁:

    • KEYS[1] :需要加锁的key,这里需要是字符串类型。
    • ARGV[1] :锁的超时时间,防止死锁
    • ARGV[2] :锁的唯一标识,(UUID.randomUUID()) + “:” + threadId
     1 // 检查是否key已经被占用,如果没有则设置超时时间和唯一标识,初始化value=1
     2 if (redis.call('exists', KEYS[1]) == 0) 
     3 then  
     4 redis.call('hset', KEYS[1], ARGV[2], 1); //hset key field value 哈希数据结构
     5 redis.call('pexpire', KEYS[1], ARGV[1]); //pexpire key expireTime 设置有效时间 
     6 return nil; 
     7 end; 
     8 // 如果锁重入,需要判断锁的key field 都一直情况下 value 加一
     9 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
    10 then 
    11 redis.call('hincrby', KEYS[1], ARGV[2], 1);//hincrby key filed addValue 加1
    12 redis.call('pexpire', KEYS[1], ARGV[1]);//pexpire key expireTime重新设置超时时间
    13 return nil; 
    14 end; 
    15 // 返回剩余的过期时间
    16 return redis.call('pttl', KEYS[1]);

    以上的方法,当返回空是,说明获取到锁,如果返回一个long数值(pttl 命令的返回值),说明锁已被占用,通过返回剩余时间,外部可以做一些等待时间的判断和调整。

    不再分析解锁步骤,直接贴上解锁的redis 命令

    解锁:

    – KEYS[1] :需要加锁的key,这里需要是字符串类型。

    – KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”

    – ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。

    – ARGV[2] :锁的超时时间,防止死锁

    – ARGV[3] :锁的唯一标识,(UUID.randomUUID()) + “:” + threadId

     1 // 如果key已经不存在,说明已经被解锁,直接发布(publihs)redis消息
     2 if (redis.call('exists', KEYS[1]) == 0) 
     3 then
     4     redis.call('publish', KEYS[2], ARGV[1]);//publish ChannelName message向信道发送解锁消息
     5     return 1;
     6 end;
     7 // key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。
     8 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
     9 then 
    10     return nil;
    11 end; 
    12 // 将value减1
    13 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //hincrby key filed addValue 减1
    14 // 如果counter>0说明锁在重入,不能删除key
    15 if (counter > 0)  
    16 then
    17     redis.call('pexpire', KEYS[1], ARGV[2]);                             
    18     return 0; 
    19 else 
    20     // 删除key并且publish 解锁消息
    21     redis.call('del', KEYS[1]);                            
    22     redis.call('publish', KEYS[2], ARGV[1]); 
    23     return 1; 
    24 end; 
    25 return nil;

     特点:

    逻辑并不复杂, 实现了可重入功能, 通过pub/sub功能来减少空转,性能极高

    实现了Lock的大部分功能,支持强制解锁

    实战:

    1.创建客户端配置类:

    这里我们最终只用了一种来测试,就是initSingleServerConfig单例模式。

      1 package distributed.lock.redis;
      2 
      3 import org.redisson.config.Config;
      4 
      5 /**
      6  * 
      7  * @ClassName:RedissionConfig
      8  * @Description:自定义RedissionConfig初始化方法
      9  * 支持自定义构造:单例模式,集群模式,主从模式,哨兵模式。
     10  * 注:此处使用spring bean 配置文件保证bean单例,见applicationContext-redis.xml
     11  * 大家也可以用工厂模式自己维护单例:本类生成RedissionConfig,再RedissonClient redisson = Redisson.create(config);这样就可以创建RedissonClient
     12  * @author diandian.zhang
     13  * @date 2017年7月20日下午12:55:50
     14  */
     15 public class RedissionConfig {
     16     private RedissionConfig() {
     17     }
     18 
     19     public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword) {
     20         return initSingleServerConfig(redisHost, redisPort, redisPassword, 0);
     21     }
     22 
     23     /**
     24      * 
     25      * @Description 使用单例模式初始化构造Config
     26      * @param redisHost
     27      * @param redisPort
     28      * @param redisPassword
     29      * @param redisDatabase redis db 默认0 (0~15)有redis.conf配置文件中参数来控制数据库总数:database 16.
     30      * @return
     31      * @author diandian.zhang
     32      * @date 2017年7月20日下午12:56:21
     33      * @since JDK1.8
     34      */
     35     public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword,Integer redisDatabase) {
     36         Config config = new Config();
     37         config.useSingleServer().setAddress(redisHost + ":" + redisPort)
     38         .setPassword(redisPassword)
     39         .setDatabase(redisDatabase);//可以不设置,看业务是否需要隔离
     40         //RedissonClient redisson = Redisson.create(config);
     41         return config;
     42     }
     43     
     44     /**
     45      * 
     46      * @Description 集群模式
     47      * @param masterAddress
     48      * @param nodeAddressArray
     49      * @return
     50      * @author diandian.zhang
     51      * @date 2017年7月20日下午3:29:32
     52      * @since JDK1.8
     53      */
     54     public static Config initClusterServerConfig(String masterAddress, String[] nodeAddressArray) {
     55             String nodeStr = "";
     56         for(String slave:nodeAddressArray){
     57             nodeStr +=","+slave;
     58         }
     59         Config config = new Config();
     60         config.useClusterServers()
     61             .setScanInterval(2000) // cluster state scan interval in milliseconds
     62             .addNodeAddress(nodeStr);
     63        return config;
     64    }
     65     
     66     /**
     67      * 
     68      * @Description 主从模式
     69      * @param masterAddress 一主
     70      * @param slaveAddressArray 多从
     71      * @return
     72      * @author diandian.zhang
     73      * @date 2017年7月20日下午2:29:38
     74      * @since JDK1.8
     75      */
     76     public static Config initMasterSlaveServerConfig(String masterAddress, String[] slaveAddressArray) {
     77          String slaveStr = "";
     78          for(String slave:slaveAddressArray){
     79              slaveStr +=","+slave;
     80          }
     81         Config config = new Config();
     82         config.useMasterSlaveServers()
     83         .setMasterAddress(masterAddress)//一主
     84         .addSlaveAddress(slaveStr);//多从"127.0.0.1:26389", "127.0.0.1:26379"
     85         return config;
     86     }
     87     
     88     /**
     89      * 
     90      * @Description 哨兵模式
     91      * @param masterAddress
     92      * @param slaveAddressArray
     93      * @return
     94      * @author diandian.zhang
     95      * @date 2017年7月20日下午3:01:35
     96      * @since JDK1.8
     97      */
     98     public static Config initSentinelServerConfig(String masterAddress, String[] sentinelAddressArray) {
     99         String sentinelStr = "";
    100         for(String sentinel:sentinelAddressArray){
    101             sentinelStr +=","+sentinel;
    102         }
    103         Config config = new Config();
    104         config.useSentinelServers()
    105         .setMasterName("mymaster")
    106         .addSentinelAddress(sentinelStr);
    107         return config;
    108     }
    109     
    110     
    111 }

    2.分布式锁实现类

      1 package distributed.lock.redis;
      2 
      3 
      4 
      5 import java.text.SimpleDateFormat;
      6 import java.util.Date;
      7 import java.util.concurrent.CountDownLatch;
      8 import java.util.concurrent.TimeUnit;
      9 
     10 import org.redisson.Redisson;
     11 import org.redisson.api.RLock;
     12 import org.redisson.api.RedissonClient;
     13 import org.slf4j.Logger;
     14 import org.slf4j.LoggerFactory;
     15 
     16 
     17 public class RedissonTest {
     18     private static final Logger logger = LoggerFactory.getLogger(RedissonTest.class);
     19     static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     20     //这里可自定义多种模式,单例,集群,主从,哨兵模式。为了简单这里使用单例模式
     21     private static RedissonClient redissonClient = Redisson.create(RedissionConfig.initSingleServerConfig("192.168.50.107", "6379", "password"));
     22     
     23     public static void main(String[] args) {
     24         CountDownLatch latch = new CountDownLatch(3);
     25          // key
     26         String lockKey = "testkey20170802";
     27         try {
     28             Thread t1 = new Thread(() -> {
     29                 doWithLock(lockKey,latch);//函数式编程
     30             }, "t1");
     31             Thread t2 = new Thread(() -> {
     32                 doWithLock(lockKey,latch);
     33             }, "t2");
     34             Thread t3 = new Thread(() -> {
     35                 doWithLock(lockKey,latch);
     36             }, "t3");
     37             //启动线程
     38             t1.start();
     39             t2.start();
     40             t3.start(); 
     41             //等待全部完成
     42             latch.await();
     43             System.out.println("3个线程都解锁完毕,关闭客户端!");
     44             redissonClient.shutdown();
     45         } catch (Exception e) {
     46             e.printStackTrace();
     47         }
     48     }
     49     
     50     /**
     51      * 
     52      * @Description 线程执行函数体
     53      * @param lockKey
     54      * @author diandian.zhang
     55      * @date 2017年8月2日下午3:37:32
     56      * @since JDK1.8
     57      */
     58     private static void doWithLock(String lockKey,CountDownLatch latch) {
     59         try {
     60             System.out.println("进入线程="+Thread.currentThread().getName()+":"+time.format(new Date()));
     61             //获取锁,30秒内获取到返回true,未获取到返回false,60秒过后自动unLock
     62             if (tryLock(lockKey, 30, 60, TimeUnit.SECONDS)) {
     63                 System.out.println(Thread.currentThread().getName() + " 获取锁成功!,执行需要加锁的任务"+time.format(new Date()));
     64                 Thread.sleep(2000L);//休息2秒模拟执行需要加锁的任务
     65             //获取锁超时
     66             }else{
     67                 System.out.println(Thread.currentThread().getName() + " 获取锁超时!"+time.format(new Date()));
     68             }
     69         } catch (Exception e) {
     70             e.printStackTrace();
     71         } finally {
     72             try {
     73                 //释放锁
     74                 unLock(lockKey);
     75                 latch.countDown();//完成,计数器减一  
     76             } catch (Exception e) {
     77                 e.printStackTrace();
     78             }
     79         }
     80     }
     81     
     82     /**
     83      * 
     84      * @Description 获取锁,锁waitTime时间内获取到返回true,未获取到返回false,租赁期leaseTime过后unLock(除非手动释放锁)
     85      * @param key
     86      * @param waitTime
     87      * @param leaseTime
     88      * @param timeUnit
     89      * @return
     90      * @author diandian.zhang
     91      * @date 2017年8月2日下午3:24:09
     92      * @since JDK1.8
     93      */
     94     public static boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit timeUnit) {
     95         try {
     96             //根据key获取锁实例,非公平锁
     97             RLock lock = redissonClient.getLock(key);
     98             //在leaseTime时间内阻塞获取锁,获取锁后持有锁直到leaseTime租期结束(除非手动unLock释放锁)。
     99             return lock.tryLock(waitTime, leaseTime, timeUnit);
    100         } catch (Exception e) {
    101             logger.error("redis获取分布式锁异常;key=" + key + ",waitTime=" + waitTime + ",leaseTime=" + leaseTime +
    102                     ",timeUnit=" + timeUnit, e);
    103             return false;
    104         }
    105     }
    106     
    107     /**
    108      * 
    109      * @Description 释放锁
    110      * @param key
    111      * @author diandian.zhang
    112      * @date 2017年8月2日下午3:25:34
    113      * @since JDK1.8
    114      */
    115     public static void unLock(String key) {
    116         RLock lock = redissonClient.getLock(key);
    117         lock.unlock();
    118         System.out.println(Thread.currentThread().getName() + " 释放锁"+time.format(new Date()));
    119     }
    120 }

    执行结果如下:

     1 进入线程=t3:2017-08-02 16:33:19
     2 进入线程=t1:2017-08-02 16:33:19
     3 进入线程=t2:2017-08-02 16:33:19
     4 t2 获取锁成功!,执行需要加锁的任务2017-08-02 16:33:19--->T2  19秒时获取到锁
     5 t2 释放锁2017-08-02 16:33:21--->T2任务完成,21秒时释放锁
     6 t1 获取锁成功!,执行需要加锁的任务2017-08-02 16:33:21--->T1  21秒时获取到锁
     7 t1 释放锁2017-08-02 16:33:23--->T2任务完成,23秒时释放锁
     8 t3 获取锁成功!,执行需要加锁的任务2017-08-02 16:33:23--->T3  23秒时获取到锁
     9 t3 释放锁2017-08-02 16:33:25--->T2任务完成,25秒时释放锁
    10 3个线程都解锁完毕,关闭客户端!

    如上图,3个线程共消耗25-19=6秒,验证通过,确实互斥锁住了。

    我们用Redis Desktop Manger来看一下redis中数据:

     1 192.168.50.107:0>hgetall "testkey20170802"--->用key查询hash所有的值
     2 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:30--->T2获取到锁field=uuid:线程号
     3 2) 1                                      --->value=1代表重入次数为1
     4 192.168.50.107:0>hgetall "testkey20170802"--->T2释放锁,T1获取到锁
     5 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:29
     6 2) 1
     7 192.168.50.107:0>hgetall "testkey20170802"--->T1释放锁,T3获取到锁
     8 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:31
     9 2) 1
    10 192.168.50.107:0>hgetall "testkey20170802"--->最后一次查询时,T3释放锁,已无数据

    3.3基于zookeeper实现

    原理:

    每个客户端(每个JVM内部共用一个客户端实例)对某个方法加锁时,在zookeeper上指定节点的目录下,生成一个唯一的瞬时有序节点。判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。当释放锁的时候,只需将这个瞬时节点删除即可。

    我们使用apache的Curator组件来实现,一般使用Client、Framework、Recipes三个组件。

    curator下,InterProcessMutex可重入互斥公平锁,源码(curator-recipes-2.4.1.jar)注释如下:

    A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is "fair" - each user will get the mutex in the order requested (from ZK's point of view)

    即一个在JVM上工作的可重入互斥锁。使用ZK去持有这把锁。在所有JVM中的进程组,只要使用相同的锁路径将会获得进程间的临界资源。进一步说,这个互斥锁是公平的-因为每个线程将会根据请求顺序获得这个互斥量(对于ZK来说)

    主要方法如下:

    1     // 构造方法
    2     public InterProcessMutex(CuratorFramework client, String path)
    3     public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
    4     // 通过acquire获得锁,并提供超时机制:
    5     public void acquire() throws Exception
    6     public boolean acquire(long time, TimeUnit unit) throws Exception
    7     // 撤销锁
    8     public void makeRevocable(RevocationListener<InterProcessMutex> listener)
    9     public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)

    我们主要分析核心获取锁acquire方法如下:

     1 @Override
     2     public boolean acquire(long time, TimeUnit unit) throws Exception
     3     {
     4         return internalLock(time, unit);
     5     }
     6 
     7 private boolean internalLock(long time, TimeUnit unit) throws Exception
     8     {
     9         /*
    10            Note on concurrency: a given lockData instance
    11            can be only acted on by a single thread so locking isn't necessary
    12         */
    13 
    14         Thread          currentThread = Thread.currentThread();
    15         //线程安全map:private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();
    16         LockData        lockData = threadData.get(currentThread);
    17         if ( lockData != null )
    18         {
    19             //这里实现了可重入,如果当前线程已经获取锁,计数+1,直接返回true
    20             lockData.lockCount.incrementAndGet();
    21             return true;
    22         }
    23         //获取锁,核心方法
    24         String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    25         if ( lockPath != null )
    26         {   //得到锁,塞进线程安全map
    27             LockData        newLockData = new LockData(currentThread, lockPath);
    28             threadData.put(currentThread, newLockData);
    29             return true;
    30         }
    31 
    32         return false;
    33     }

    核心获取锁的方法attemptLock源码如下:

     1 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
     2     {
     3         final long      startMillis = System.currentTimeMillis();
     4         final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
     5         final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
     6         int             retryCount = 0;
     7 
     8         String          ourPath = null;
     9         boolean         hasTheLock = false;
    10         boolean         isDone = false;
    11         while ( !isDone )
    12         {
    13             isDone = true;
    14 
    15             try
    16             {
    17                 if ( localLockNodeBytes != null )
    18                 {   
    19                     ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
    20                 }
    21                 else
    22                 {   //创建瞬时节点(客户端断开连接时删除),节点名追加自增数字
    23                     ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    24                 }
    //自循环等待时间,并判断是否获取到锁
    25 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); 26 } 27 catch ( KeeperException.NoNodeException e ) 28 { 29 // gets thrown by StandardLockInternalsDriver when it can't find the lock node 30 // this can happen when the session expires, etc. So, if the retry allows, just try it all again 31 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) 32 { 33 isDone = false; 34 } 35 else 36 { 37 throw e; 38 } 39 } 40 } 41 //获取到锁返回节点path 42 if ( hasTheLock ) 43 { 44 return ourPath; 45 } 46 47 return null; 48 }
    自循环等待时间:
     1  private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
     2     {
     3         boolean     haveTheLock = false;
     4         boolean     doDelete = false;
     5         try
     6         {
     7             if ( revocable.get() != null )
     8             {
     9                 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
    10             }
    11 
    12             while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )//如果状态是开始且未获取到锁
    13             {
    14                 List<String>        children = getSortedChildren();//获取父节点下所有线程的子节点
    15                 String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // 获取当前节点名称
    16                 //核心方法:判断是否获取到锁
    17                 PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
    18                 if ( predicateResults.getsTheLock() )//获取到锁,置true,下一次循环退出
    19                 {
    20                     haveTheLock = true;
    21                 }
    22                 else//没有索取到锁
    23                 {
    24                     String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();//这里路径是上一次获取到锁的持有锁路径
    25 
    26                     synchronized(this)//强制加锁
    27                     {
                     //让线程等待,并且watcher当前节点,当节点有变化的之后,则notifyAll当前等待的线程,让它再次进入来争抢锁
    28 Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath); 29 if ( stat != null ) 30 { 31 if ( millisToWait != null ) 32 { 33 millisToWait -= (System.currentTimeMillis() - startMillis); 34 startMillis = System.currentTimeMillis(); 35 if ( millisToWait <= 0 ) 36 { 37 doDelete = true; //等待超时,置状态为true,后面会删除节点 38 break; 39 } 40 //等待指定时间 41 wait(millisToWait); 42 } 43 else 44 { //一直等待 45 wait(); 46 } 47 } 48 } 49 // else it may have been deleted (i.e. lock released). Try to acquire again 50 } 51 } 52 } 53 catch ( Exception e ) 54 { 55 doDelete = true; 56 throw e; 57 } 58 finally 59 { 60 if ( doDelete )//删除path 61 { 62 deleteOurPath(ourPath); 63 } 64 } 65 return haveTheLock; 66 }
     1 @Override           
     2     public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
     3     {
     4         int             ourIndex = children.indexOf(sequenceNodeName);//先根据子节点名获取children(所有子节点升序集合)中的索引
     5         validateOurIndex(sequenceNodeName, ourIndex);//校验如果索引为负值,即不存在该子节点
     6         //maxLeases允许同时租赁的数量,这里源代码写死了1,但这种设计符合将来拓展,修改maxLeases即可满足多租赁
     7         boolean         getsTheLock = ourIndex < maxLeases;//maxLeases=1,所以只有当index=0时才是true,即所有子节点中升序排序第一个最小值,即第一个请求过来的,这就是核心思想所在!
     8         String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);//获取到锁返回null,否则未获取到锁,获取上一次的获取到锁的路径。后面会监视这个路径用以唤醒请求线程
     9        
    10         return new PredicateResults(pathToWatch, getsTheLock);
    11     }

    特点:

    1.可避免死锁:zk瞬时节点(Ephemeral Nodes)生命周期和session一致,session结束,节点自动删除。
    2.依赖zk创建节点,涉及文件操作,开销较大。

    实战:

    1.创建客户端client
    2.生成互斥锁InterProcessMutex
    3.开启3个线程去获取锁

     1 package distributed.lock.zk;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 import java.util.concurrent.TimeUnit;
     6 
     7 import org.apache.curator.framework.CuratorFramework;
     8 import org.apache.curator.framework.CuratorFrameworkFactory;
     9 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    10 import org.apache.curator.retry.ExponentialBackoffRetry;
    11 import org.apache.curator.retry.RetryNTimes;
    12 import org.jboss.netty.channel.StaticChannelPipeline;
    13 import org.omg.CORBA.PRIVATE_MEMBER;
    14 
    15 /**
    16  * 
    17  * @ClassName:CuratorDistrLockTest
    18  * @Description:Curator包实现zk分布式锁:利用了zookeeper的临时顺序节点特性,一旦客户端失去连接后,则就会自动清除该节点。
    19  * @author diandian.zhang
    20  * @date 2017年7月11日下午12:43:44
    21  */
    22 
    23 public class CuratorDistrLock {
    24     private static final String ZK_ADDRESS = "192.168.50.253:2181";//zk
    25     private static final String ZK_LOCK_PATH = "/zktest";//path
    26     static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    27     
    28     public static void main(String[] args) {
    29         try {
    30             //创建zk客户端
    31 //            CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(3, 1000));
    32             CuratorFramework client = CuratorFrameworkFactory.builder()
    33                     .connectString(ZK_ADDRESS)
    34                     .sessionTimeoutMs(5000)
    35                     .retryPolicy(new ExponentialBackoffRetry(1000, 10))
    36                     .build();
    37             //开启
    38             client.start();
    39             System.out.println("zk client start successfully!"+time.format(new Date()));
    40             
    41             Thread t1 = new Thread(() -> {
    42                 doWithLock(client);//函数式编程
    43             }, "t1");
    44             Thread t2 = new Thread(() -> {
    45                 doWithLock(client);
    46             }, "t2");
    47             Thread t3 = new Thread(() -> {
    48                 doWithLock(client);
    49             }, "t3");
    50             //启动线程
    51             t1.start();
    52             t2.start();
    53             t3.start(); 
    54         } catch (Exception e) {
    55             e.printStackTrace();
    56         }
    57     }
    58 
    59     /**
    60      * 
    61      * @Description 线程执行函数体
    62      * @param client
    63      * @param lock
    64      * @author diandian.zhang
    65      * @date 2017年7月12日下午6:00:53
    66      * @since JDK1.8
    67      */
    68     private static void doWithLock(CuratorFramework client) {
    69         //依赖ZK生成的可重入互斥公平锁(按照请求顺序)
    70         InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
    71         try {
    72             System.out.println("进入线程="+Thread.currentThread().getName()+":"+time.format(new Date()));
    73             
    74             //花20秒时间尝试获取锁
    75             if (lock.acquire(20, TimeUnit.SECONDS)) {
    76                 System.out.println(Thread.currentThread().getName() + " 获取锁成功!,执行需要加锁的任务"+time.format(new Date()));
    77                 Thread.sleep(2000L);//休息2秒模拟执行需要加锁的任务
    78             //获取锁超时
    79             }else{
    80                 System.out.println(Thread.currentThread().getName() + " 获取锁超时!"+time.format(new Date()));
    81             }
    82         } catch (Exception e) {
    83             e.printStackTrace();
    84         } finally {
    85             try {
    86                 //当前线程获取到锁,那么最后需要释放锁(实际上是删除节点)
    87                 if (lock.isAcquiredInThisProcess()) {
    88                     lock.release();
    89                     System.out.println(Thread.currentThread().getName() + " 释放锁"+time.format(new Date()));
    90                 }
    91             } catch (Exception e) {
    92                 e.printStackTrace();
    93             }
    94         }
    95     }
    96 
    97 }

    执行结果:

    zk client start successfully!
    进入线程=t2:2017-07-13 11:13:23
    进入线程=t1:2017-07-13 11:13:23
    进入线程=t3:2017-07-13 11:13:23
    t2 获取锁成功!,执行需要加锁的任务2017-07-13 11:13:23----》起始时间23秒
    t2 释放锁2017-07-13 11:13:25
    t3 获取锁成功!,执行需要加锁的任务2017-07-13 11:13:25----》验证耗时2秒,T2执行完,T3执行
    t3 释放锁2017-07-13 11:13:27
    t1 获取锁成功!,执行需要加锁的任务2017-07-13 11:13:27----》验证耗时2秒,T3执行完,T1执行
    t1 释放锁2017-07-13 11:13:29----》验证耗时2秒,T1执行完,3个任务共耗时=29-23=6秒,验证互斥锁达到目标。

    查看zookeeper节点

    1.客户端连接

    zkCli.sh -server 192.168.50.253:2181

    2.查看节点

    [zk: 192.168.50.253:2181(CONNECTED) 80] ls /-----》查看根目录
    [dubbo, zktest, zookeeper, test]

    [zk: 192.168.50.253:2181(CONNECTED) 81] ls /zktest -----》查看我们创建的子节点
    [_c_034e5f23-abaf-4d4a-856f-c27956db574e-lock-0000000007, _c_63c708f1-2c3c-4e59-9d5b-f0c70c149758-lock-0000000006, _c_1f688cb7-c38c-4ebb-8909-0ba421e484a4-lock-0000000008]

    [zk: 192.168.50.253:2181(CONNECTED) 82] ls /zktest-----》任务执行完毕最终释放了子节点
    []

    4.总结比较

    一级锁分类

    二级锁分类

    锁名称

    特性

    是否推荐

    单JVM锁

    基于JVM源生synchronized关键字实现

    synchronized同步锁

     适用于低并发的情况,性能稳定 新手推荐
    基于JDK实现,需显示获取锁,释放锁

    ReentrantLock可重入锁

     适用于低、高并发的情况,性能较高  需要指定公平、非公平或condition时使用。

    ReentrantReadWriteLock

    可重入读写锁

     适用于读多写少的情况。性能高。  老司机推荐

    StampedLock戳锁

     JDK8才有,适用于高并发且读远大于写时,支持乐观读,票据校验失败后可升级悲观读锁,性能极高  老司机推荐

    分布式锁

    基于数据库锁实现

    悲观锁:select for update

     sql直接使用,但水很深。设计数据库ACID原理+隔离级别+不同数据库规范
     高端老司机推荐

    乐观锁:版本控制

     自己实现字段版本控制  新手推荐

    基于缓存实现

    org.redisson

     性能极高,支持除了分布式锁外还实现了分布式对象、分布式集合等极端强大的功能  老司机推荐

    基于zookeeper实现

    org.apache.curator zookeeper

     性能较高,除支持分布式锁外,还实现了master选举、节点监听()、分布式队列、Barrier、AtomicLong等计数器  老司机推荐

    =====附Redis命令=======

    1. SETNX key value (SET if Not eXists):当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。详见:SETNX commond
    2. GETSET key value:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。详见:GETSET commond
    3. GET key:返回 key 所关联的字符串值,如果 key 不存在那么返回 nil 。详见:GET Commond
    4. DEL key [KEY …]:删除给定的一个或多个 key ,不存在的 key 会被忽略,返回实际删除的key的个数(integer)。详见:DEL Commond
    5. HSET key field value:给一个key 设置一个{field=value}的组合值,如果key没有就直接赋值并返回1,如果field已有,那么就更新value的值,并返回0.详见:HSET Commond
    6. HEXISTS key field:当key 中存储着field的时候返回1,如果key或者field至少有一个不存在返回0。详见HEXISTS Commond
    7. HINCRBY key field increment:将存储在 key 中的哈希(Hash)对象中的指定字段 field 的值加上增量 increment。如果键 key 不存在,一个保存了哈希对象的新建将被创建。如果字段 field 不存在,在进行当前操作前,其将被创建,且对应的值被置为 0。返回值是增量之后的值。详见:HINCRBY Commond
    8. PEXPIRE key milliseconds:设置存活时间,单位是毫秒。expire操作单位是秒。详见:PEXPIRE Commond
    9. PUBLISH channel message:向channel post一个message内容的消息,返回接收消息的客户端数。详见PUBLISH Commond

    ======参考======

    分布式锁的几种实现方式~

     基于Redis实现分布式锁,Redisson使用及源码分析

  • 相关阅读:
    Promis.then()
    原生JS简单封装JSONP跨域获取数据
    原生JavaScript手写Ajax
    VS Code保存代码自动按eslint格式fix
    html data-xx 及 data()注意事项
    C#委托和事件
    vue devServer proxy 代理无效的问题
    vue .sync的使用
    js中,0的判断
    使用idea启动node项目的问题
  • 原文地址:https://www.cnblogs.com/dennyzhangdd/p/7133653.html
Copyright © 2011-2022 走看看