zoukankan      html  css  js  c++  java
  • ZooKeeper(七)-- ZK原生API实现分布式锁

     一、使用场景

      在分布式应用,往往存在多个进程提供同一服务。这些进程有可能在相同的机器上,也有可能分布在不同的机器上。 如果这些进程共享了一些资源,可能就需要分布式锁来锁定对这些资源的访问。

    二、实现分布式锁结构图

    三、代码实现

      1 package com.xbq.zookeeper.javaApi;
      2 
      3 import java.util.Collections;
      4 import java.util.List;
      5 import java.util.concurrent.CountDownLatch;
      6 import java.util.concurrent.ExecutorService;
      7 import java.util.concurrent.Executors;
      8 import java.util.concurrent.Semaphore;
      9 import java.util.concurrent.TimeUnit;
     10 
     11 import org.apache.zookeeper.CreateMode;
     12 import org.apache.zookeeper.KeeperException;
     13 import org.apache.zookeeper.WatchedEvent;
     14 import org.apache.zookeeper.Watcher;
     15 import org.apache.zookeeper.ZooDefs.Ids;
     16 import org.apache.zookeeper.ZooKeeper;
     17 import org.apache.zookeeper.data.Stat;
     18 
     19 /**
     20  * 使用Zookeeper原生API实现分布式锁
     21  * @author xbq
     22  */
     23 public class ZookeeperLock implements Watcher{
     24 
     25     // 声明zk对象
     26     private ZooKeeper zk = null;
     27     // 此demo使用的集群,所以有多个ip和端口
     28     private static String CONNECT_SERVER = "192.168.242.130:2181,192.168.242.130:2182,192.168.242.130:2183";
     29     // session过期时间
     30     private static int SESSION_TIMEOUT = 3000;
     31     // 根节点
     32     private String root = "/locks";
     33     // 当前等待的节点
     34     private String waitNode;
     35     // 等待的时间
     36     private int waitTime;
     37     // 锁节点
     38     private String myzkNode;
     39     // 计数器
     40     private CountDownLatch latch;
     41     
     42     /**
     43      * 构造函数 初始化
     44      */
     45     public ZookeeperLock(){
     46         try {
     47             zk = new ZooKeeper(CONNECT_SERVER, SESSION_TIMEOUT, this);
     48             // 判断是否存在根节点,不需要监听根节点
     49             Stat stat = zk.exists(root, false);
     50             // 为空,说明不存在
     51             if(stat == null){
     52                 // 添加一个永久节点,即添加一个根节点
     53                 zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     54             }
     55         } catch (Exception e) {
     56             e.printStackTrace();
     57         }
     58     }
     59     
     60     /**
     61      * 尝试获取锁
     62      * @return
     63      */
     64     private boolean tryLock(){
     65         String splitStr = "lock_";  // 格式 lock_000000001
     66         try {
     67             // 创建一个临时有序节点,并赋值给 myzkNode
     68             myzkNode = zk.create(root + "/" + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
     69             // 获取根节点下的所有子节点
     70             List<String> children = zk.getChildren(root, false);
     71             // 对子节点 排序
     72             Collections.sort(children);
     73             // 如果刚刚创建的节点 等于  获取最小的一个子节点,则说明 获取到锁
     74             if(myzkNode.equals(root + "/" + children.get(0))){
     75                 return true;
     76             }
     77             // 如果刚刚创建的节点 不等于 获取到的最小的一个子节点,则 监控比自己小的一个节点
     78             // 获取刚刚建立的子节点(不包含根节点的子节点)
     79             String childNode = myzkNode.substring(myzkNode.lastIndexOf("/") + 1);
     80             // 获取比自己小的节点
     81             waitNode = children.get(Collections.binarySearch(children, childNode) - 1);
     82         } catch (KeeperException e) {
     83             e.printStackTrace();
     84         } catch (InterruptedException e) {
     85             e.printStackTrace();
     86         }
     87         return false;
     88     }
     89     
     90     /**
     91      * 等待锁释放
     92      * @param waitNode
     93      * @param waidTime
     94      * @return
     95      * @throws KeeperException
     96      * @throws InterruptedException
     97      */
     98     private boolean waitLock(String waitNode, int waidTime) throws KeeperException, InterruptedException{
     99         // 判断比自己小的节点是否存在,并添加监听
    100         Stat stat = zk.exists(root + "/" + waitNode, true);
    101         // 如果存在 比自己小的节点
    102         if(stat != null){
    103             this.latch = new CountDownLatch(1);
    104             this.latch.await(waidTime, TimeUnit.MILLISECONDS);
    105             this.latch = null;
    106         }
    107         return true;
    108     }
    109     
    110     /**
    111      * 获取锁
    112      */
    113     public void lock(){
    114         // 如果尝试获取锁成功
    115         if(tryLock()){
    116             // 获取 成功
    117             System.out.println("Thread" + Thread.currentThread().getName() + " -- hold lock!");
    118             return;
    119         }
    120         // 等待并获取锁
    121         try {
    122             waitLock(waitNode, waitTime);
    123         } catch (KeeperException e) {
    124             e.printStackTrace();
    125         } catch (InterruptedException e) {
    126             e.printStackTrace();
    127         }
    128     }
    129     
    130     /**
    131      * 解锁
    132      */
    133     public void unLock(){
    134         try {
    135             zk.delete(myzkNode, -1);
    136             zk.close();
    137             System.out.println("Thread" + Thread.currentThread().getName() +" unlock success! 锁节点:" + myzkNode);
    138         } catch (InterruptedException e) {
    139             e.printStackTrace();
    140         } catch (KeeperException e) {
    141             e.printStackTrace();
    142         }
    143     }
    144 
    145     /**
    146      * 删除的时候 触发事件
    147      */
    148     @Override
    149     public void process(WatchedEvent event) {
    150         // 如果计数器不为空的话,释放计数器锁
    151         if(this.latch != null){
    152             this.latch.countDown();
    153         }
    154     }
    155     
    156     /**
    157      * 测试
    158      * @param args
    159      */
    160     public static void main(String[] args) {
    161         // 定义线程池
    162         ExecutorService service = Executors.newCachedThreadPool();
    163         // 只能10个线程同时运行,即模拟并发数为10
    164         final Semaphore semaphore = new Semaphore(10);
    165         // 10个客户端连接
    166         for(int i=0;i<10;i++){
    167             Runnable runnable = new Runnable() {
    168                 @Override
    169                 public void run() {
    170                     try {
    171                         semaphore.acquire();
    172                         ZookeeperLock zkLock = new ZookeeperLock();
    173                         zkLock.lock();
    174                         // 业务操作代码
    175                         Thread.sleep(3000);
    176                         zkLock.unLock();
    177                         semaphore.release();
    178                     } catch (Exception e) {
    179                         e.printStackTrace();
    180                     }
    181                 }
    182             };
    183             service.execute(runnable);
    184         }
    185         service.shutdown();
    186     }
    187 }

    源码下载:https://gitee.com/xbq168/DistributedLockByRedis

  • 相关阅读:
    我的博客的定制代码
    在网站中使用Bing Translator插件翻译文章。
    java 爬虫 WebMagic(四)-Scheduler
    java 爬虫 WebMagic(三)-PipeLine
    java 爬虫 WebMagic(二)-PageProcessor
    java 爬虫 WebMagic(一)-Spider
    java 解析json 万能型
    c# 解析Json 万能型
    常用的Linux 命令
    将文件夹和文件提交到git
  • 原文地址:https://www.cnblogs.com/xbq8080/p/6770077.html
Copyright © 2011-2022 走看看