zoukankan      html  css  js  c++  java
  • 利用Zookeeper实现分布式锁

    特别提示:本人博客部分有参考网络其他博客,但均是本人亲手编写过并验证通过。如发现博客有错误,请及时提出以免误导其他人,谢谢!欢迎转载,但记得标明文章出处:http://www.cnblogs.com/mao2080/

    1、分布式锁是什么?

    分布式锁是控制分布式系统之间同步访问共享资源的一种方式。

    2、为什么需要分布式锁?

    在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

    3、乐观锁和悲观锁含义

    悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

    乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。

    两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。

    本段参考了:http://blog.csdn.net/hongchangfirst/article/details/26004335

    4、具体实现

    本文采用zookeeper第三方库curator实现分布式锁。

    1、添加依赖

     1     <!-- Curator对Zookeeper封装 -->
     2         <dependency>
     3             <groupId>org.apache.curator</groupId>
     4             <artifactId>curator-recipes</artifactId>
     5             <version>2.9.1</version>
     6         </dependency>
     7 
     8         <!-- Curator对Zookeeper封装 -->
     9         <dependency>
    10             <groupId>org.apache.curator</groupId>
    11             <artifactId>curator-client</artifactId>
    12             <version>2.9.1</version>
    13         </dependency>
    14         
    15         <!-- Curator对Zookeeper封装 -->
    16         <dependency>
    17             <groupId>org.apache.curator</groupId>
    18             <artifactId>curator-framework</artifactId>
    19             <version>2.9.1</version>
    20         </dependency>

    2、锁的实现

      1 package com.mao;
      2 import org.apache.curator.RetryPolicy;
      3 import org.apache.curator.framework.CuratorFramework;
      4 import org.apache.curator.framework.CuratorFrameworkFactory;
      5 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
      6 import org.apache.curator.retry.ExponentialBackoffRetry;
      7 
      8 /**
      9  * 
     10  * 项目名称:---
     11  * 模块名称:公共服务-锁工具
     12  * 功能描述:锁工具类(基于Zookeeper)
     13  * 创建人: mao2080@sina.com
     14  * 创建时间:2017年4月20日 下午4:35:56
     15  * 修改人: mao2080@sina.com
     16  * 修改时间:2017年4月20日 下午4:35:56
     17  */
     18 public class ZKLockUtil {
     19     
     20     /**Zookeeper注册中心地址*/
     21     private static final String ZOOKEEPER_SERVER;
     22     
     23     /**获取锁-每次尝试间隔(单位:毫秒)*/
     24     private static final int BASE_SLEEP_TIME = 1000;
     25     
     26     /**获取锁-最大尝试次数*/
     27     private static final int MAX_RETRIES = 5;
     28     
     29     /**Curator锁对象*/
     30     private InterProcessMutex lock;
     31     
     32     /**Curator客户端*/
     33     private CuratorFramework client;
     34     
     35     /**分布式锁根节点*/
     36     private static final String ZK_BASE_LOCK_PATH = "/Locks/";
     37     
     38     /**
     39      * 初始化Zookeeper注册中心地址
     40      */
     41     static {
     42         ZOOKEEPER_SERVER = "172.24.20.214";
     43     }
     44     /**
     45      * 
     46      * 描述:构造函数
     47      * @author mao2080@sina.com
     48      * @created 2017年4月20日 下午3:52:53
     49      * @since 
     50      * @param businessType 业务类型
     51      * @param baseSleepTimeMs 获取锁-每次尝试间隔(单位:毫秒)
     52      * @param maxRetries 获取锁-最大尝试次数
     53      * @throws BusinessException 
     54      */
     55     private void initService(BusinessType businessType, int baseSleepTimeMs, int maxRetries) throws Exception {
     56         if(isBlank(ZKLockUtil.ZOOKEEPER_SERVER)){
     57             throw new Exception();
     58         }
     59         if(businessType == null || isBlank(businessType.getCode())){
     60             throw new Exception();
     61         }
     62         try {
     63             RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
     64             this.client = CuratorFrameworkFactory.newClient(ZKLockUtil.ZOOKEEPER_SERVER, retryPolicy);
     65             this.client.start();
     66             this.setLock(new InterProcessMutex(this.client, ZK_BASE_LOCK_PATH.concat(businessType.getCode())));
     67         } catch (Exception e) {
     68             throw new Exception();
     69         }
     70     }
     71     
     72     /**
     73      * 
     74      * 描述:构造函数
     75      * @author mao2080@sina.com
     76      * @created 2017年4月20日 下午3:52:44
     77      * @since 
     78      * @param businessType 业务类型
     79      * @throws BusinessException
     80      */
     81     public ZKLockUtil(BusinessType businessType) throws Exception {
     82         this.initService(businessType, ZKLockUtil.BASE_SLEEP_TIME, ZKLockUtil.MAX_RETRIES);
     83     }
     84     
     85     /**
     86      * 
     87      * 描述:构造函数
     88      * @author mao2080@sina.com
     89      * @created 2017年4月20日 下午3:52:53
     90      * @since 
     91      * @param businessType 业务类型
     92      * @param baseSleepTimeMs 获取锁-每次尝试间隔(单位:毫秒)
     93      * @param maxRetries 获取锁-最大尝试次数
     94      * @throws BusinessException 
     95      */
     96     public ZKLockUtil(BusinessType businessType, int baseSleepTimeMs, int maxRetries) throws Exception {
     97         this.initService(businessType, baseSleepTimeMs, maxRetries);
     98     }
     99     
    100     /**
    101      * 
    102      * 描述:释放资源
    103      * @author mao2080@sina.com
    104      * @created 2017年4月20日 下午3:58:03
    105      * @since
    106      */
    107     public void close(){
    108         try {
    109             this.getLock().release();
    110         } catch (Exception e) {
    111             e.printStackTrace();
    112         }
    113         try {
    114             this.client.close();
    115         } catch (Exception e) {
    116              e.printStackTrace();
    117         }
    118     }
    119     
    120     /**
    121      * 
    122      * 描述:判断对象是否为空
    123      * @author mao2080@sina.com
    124      * @created 2017年3月20日 上午11:33:55
    125      * @since 
    126      * @param obj
    127      * @return
    128      */
    129     public static boolean isBlank(Object obj){
    130         if(obj == null || "".equals(obj.toString())){
    131             return true;
    132         }
    133         return false;
    134     }
    135     
    136     public InterProcessMutex getLock() {
    137         return lock;
    138     }
    139     
    140     public void setLock(InterProcessMutex lock) {
    141         this.lock = lock;
    142     }
    143     
    144     /**
    145      * 
    146      * 项目名称:---
    147      * 模块名称:公共服务-锁工具
    148      * 功能描述:业务类型
    149      * 创建人: mao2080@sina.com
    150      * 创建时间:2017年4月22日 下午12:27:04
    151      * 修改人: mao2080@sina.com
    152      * 修改时间:2017年4月22日 下午12:27:04
    153      */
    154     public enum BusinessType {
    155         
    156         Demo("0001", "Demo"),
    157         
    158         ;
    159         
    160         /**业务类型编码 */
    161         private String code;
    162         
    163         /**业务类型名称 */
    164         private String name;
    165         
    166         /**
    167          * 
    168          * 描述:构建业务类型
    169          * @author mao2080@sina.com
    170          * @created 2017年4月10日 下午3:42:57
    171          * @since 
    172          * @param code 业务类型编码
    173          * @param name 业务类型名称
    174          * @return
    175          */
    176         private BusinessType(String code, String name) {
    177             this.code = code;
    178             this.name = name;
    179         }
    180 
    181         public String getCode() {
    182             return code;
    183         }
    184 
    185         public void setCode(String code) {
    186             this.code = code;
    187         }
    188         
    189         public String getName() {
    190             return name;
    191         }
    192 
    193         public void setName(String name) {
    194             this.name = name;
    195         }
    196         
    197         public static void main(String[] args) {
    198             System.out.println(BusinessType.Demo.code);
    199         }
    200 
    201     }
    202 
    203 }

    3、使用场景

      1 package com.mao;
      2 
      3 import java.util.concurrent.CountDownLatch;
      4 import java.util.concurrent.ExecutorService;
      5 import java.util.concurrent.Executors;
      6 import java.util.concurrent.TimeUnit;
      7 
      8 import com.mao.ZKLockUtil.BusinessType;
      9 
     10 /**
     11  * 
     12  * 项目名称:---
     13  * 模块名称:公共服务-锁工具 
     14  * 功能描述:锁工具类 
     15  * 创建人: mao2080@sina.com 
     16  * 创建时间:2017年4月20日下午4:35:56 
     17  * 修改人: mao2080@sina.com 
     18  * 修改时间:2017年4月20日 下午4:35:56
     19  */
     20 public class ZKLockUtilTest {
     21 
     22     /**
     23      * 
     24      * 描述:具体使用样例
     25      * 
     26      * @author mao2080@sina.com
     27      * @created 2017年4月22日 下午12:29:38
     28      * @since
     29      * @param args
     30      * @throws Exception
     31      */
     32     public static void main1(String[] args) throws Exception {
     33         ZKLockUtil lock = new ZKLockUtil(BusinessType.Demo);
     34         try {
     35             if (lock.getLock().acquire(40, TimeUnit.SECONDS)) {
     36                 System.out.println("get lock success...do work");
     37                 Thread.sleep(20000);
     38             }
     39         } catch (Exception e) {
     40             System.out.println("get lock  fail...," + e.getMessage());
     41             e.printStackTrace();
     42         } finally {
     43             lock.close();
     44         }
     45     }
     46 
     47     /**
     48      * 
     49      * 描述:启动线程模拟并发访问
     50      * 
     51      * @author mao2080@sina.com
     52      * @created 2017年4月22日 下午12:29:13
     53      * @since
     54      * @param args
     55      * @throws InterruptedException
     56      */
     57     public static void main(String[] args) throws InterruptedException {
     58         CountDownLatch latch = new CountDownLatch(5);
     59         ExecutorService exec = Executors.newCachedThreadPool();
     60         for (int i = 0; i < 5; i++) {
     61             exec.submit(new MyLock("client" + i, latch));
     62         }
     63         exec.shutdown();
     64         latch.await();
     65         System.out.println("所有任务执行完毕");
     66     }
     67 
     68     static class MyLock implements Runnable {
     69         
     70         private String name;
     71         
     72         private CountDownLatch latch;
     73 
     74         public MyLock(String name, CountDownLatch latch) {
     75             this.name = name;
     76             this.latch = latch;
     77         }
     78 
     79         public String getName() {
     80             return name;
     81         }
     82 
     83         public void setName(String name) {
     84             this.name = name;
     85         }
     86 
     87         public void run() {
     88             ZKLockUtil locks = null;
     89             try {
     90                 locks = new ZKLockUtil(BusinessType.Demo);
     91             } catch (Exception e1) {
     92                 e1.printStackTrace();
     93                 return;
     94             }
     95             try {
     96                 if (locks.getLock().acquire(40, TimeUnit.SECONDS)) {
     97                     System.out.println("----------" + this.name+ "获得资源----------");
     98                     System.out.println("----------" + this.name+ "正在处理资源----------");
     99                     Thread.sleep(1 * 2000);
    100                     System.out.println("----------" + this.name+ "资源使用完毕----------");
    101                     latch.countDown();
    102                 }
    103             } catch (Exception e) {
    104                 System.out.println("get lock  fail...," + e.getMessage());
    105                 e.printStackTrace();
    106             } finally {
    107                 locks.close();
    108             }
    109         }
    110     }
    111 }

    4、运行结果

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    ----------client1获得资源----------
    ----------client1正在处理资源----------
    ----------client1资源使用完毕----------
    ----------client4获得资源----------
    ----------client4正在处理资源----------
    ----------client4资源使用完毕----------
    ----------client2获得资源----------
    ----------client2正在处理资源----------
    ----------client2资源使用完毕----------
    ----------client0获得资源----------
    ----------client0正在处理资源----------
    ----------client0资源使用完毕----------
    ----------client3获得资源----------
    ----------client3正在处理资源----------
    ----------client3资源使用完毕----------
    所有任务执行完毕

    5、参考博客

    1、http://blog.csdn.net/wuzhilon88/article/details/41121195
    2、http://www.cnblogs.com/LiZhiW/p/4931577.html

    6本文demo下载

  • 相关阅读:
    oracle proc 插入操作性能优化实践
    vmware 虚拟机共享文件夹无法显示问题解决
    oracle启动报错:ORA-03113
    c语言中sprintf()函数中的%使用
    c 的内存分配
    c实现队列
    c实现循环链表
    MantisBT导出Excel文件名显示中文的修改方法
    怎样通过Qt编写C/C++代码查询当前Linux的版本号?
    Kotlin Android Extensions: 与 findViewById 说再见 (KAD 04) -- 更新版
  • 原文地址:https://www.cnblogs.com/mao2080/p/6745025.html
Copyright © 2011-2022 走看看