zoukankan      html  css  js  c++  java
  • zookeeper:分布式锁简单实现(JavaApi)

    • 创建会话连接
     1 package com.karat.cn.zookeeperAchieveLock.javaapilock;
     2 
     3 import org.apache.zookeeper.WatchedEvent;
     4 import org.apache.zookeeper.Watcher;
     5 import org.apache.zookeeper.ZooKeeper;
     6 
     7 import java.io.IOException;
     8 import java.util.concurrent.CountDownLatch;
     9 
    10 /**
    11  * 创建会话
    12  */
    13 public class ZookeeperClient {
    14 
    15     private final static String CONNECTSTRING="47.107.121.215:2181";
    16 
    17     private static int sessionTimeout=5000;
    18 
    19     //获取连接
    20     public static ZooKeeper getInstance() throws IOException, InterruptedException {
    21         final CountDownLatch conectStatus=new CountDownLatch(1);
    22         ZooKeeper zooKeeper=new ZooKeeper(CONNECTSTRING, sessionTimeout, new Watcher() {
    23             public void process(WatchedEvent event) {
    24                 if(event.getState()== Event.KeeperState.SyncConnected){//连接成功状态
    25                     conectStatus.countDown();
    26                 }
    27             }
    28         });
    29         conectStatus.await();//等待
    30         return zooKeeper;
    31     }
    32 
    33     public static int getSessionTimeout() {
    34         return sessionTimeout;
    35     }
    36 }
    View Code
    • 临时节点删除监控
     1 package com.karat.cn.zookeeperAchieveLock.javaapilock;
     2 
     3 import org.apache.zookeeper.WatchedEvent;
     4 import org.apache.zookeeper.Watcher;
     5 
     6 import java.util.concurrent.CountDownLatch;
     7 
     8 /**
     9  *监控
    10  */
    11 public class LockWatcher implements Watcher{
    12 
    13     private CountDownLatch latch;
    14 
    15     public LockWatcher(CountDownLatch latch) {
    16         this.latch = latch;
    17     }
    18 
    19     public void process(WatchedEvent event) {
    20         if(event.getType()== Event.EventType.NodeDeleted){//当前节点是否删除
    21             latch.countDown();
    22         }
    23     }
    24 }
    View Code
    • 上锁与释放锁
      1 package com.karat.cn.zookeeperAchieveLock.javaapilock;
      2 
      3 import org.apache.zookeeper.CreateMode;
      4 import org.apache.zookeeper.KeeperException;
      5 import org.apache.zookeeper.ZooDefs;
      6 import org.apache.zookeeper.ZooKeeper;
      7 
      8 import java.io.IOException;
      9 import java.util.List;
     10 import java.util.Random;
     11 import java.util.SortedSet;
     12 import java.util.TreeSet;
     13 import java.util.concurrent.CountDownLatch;
     14 import java.util.concurrent.TimeUnit;
     15 
     16 /**
     17  * 分布式锁的实现
     18  */
     19 public class DistributeLock {
     20 
     21 
     22     private static final String ROOT_LOCKS="/LOCKS";//根节点
     23 
     24     private ZooKeeper zooKeeper;//zooKeeper实列
     25 
     26     private int sessionTimeout; //会话超时时间
     27 
     28     private String lockID; //记录锁节点id
     29 
     30     private final static byte[] data={1,2}; //节点的数据
     31 
     32     private CountDownLatch countDownLatch=new CountDownLatch(1);//计数器
     33     
     34     //会话连接
     35     public DistributeLock() throws IOException, InterruptedException {
     36         this.zooKeeper=ZookeeperClient.getInstance();
     37         this.sessionTimeout=ZookeeperClient.getSessionTimeout();
     38     }
     39 
     40     //获取锁的方法
     41     public boolean lock(){
     42         try {
     43             //创建一个临时有序节点
     44             lockID=zooKeeper.create(ROOT_LOCKS+"/",data, ZooDefs.Ids.
     45                     OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
     46             System.out.println(Thread.currentThread().getName()
     47                     +"->成功创建了lock节点["+lockID+"], 开始去竞争锁");
     48             //获取当前根节点下的所有子节点
     49             List<String> childrenNodes=zooKeeper.getChildren(ROOT_LOCKS,true);//获取根节点下的所有子节点
     50             //排序,从小到大(树节点)
     51             SortedSet<String> sortedSet=new TreeSet<String>();
     52             for(String children:childrenNodes){
     53                 sortedSet.add(ROOT_LOCKS+"/"+children);
     54             }
     55             String first=sortedSet.first(); //拿到最小的节点
     56             if(lockID.equals(first)){//如果刚创建的临时节点就是最小节点,那么就没有其它子节点,当前新建节点获取锁成功
     57                 //表示当前就是最小的节点
     58                 System.out.println(Thread.currentThread().getName()+"->成功获得锁,lock节点为:["+lockID+"]");
     59                 return true;
     60             }
     61             //当当前创建的临时节点不是最小节点时,说明之前已有创建的临时节点,之前临时节点正在使用锁,等待锁释放
     62             SortedSet<String> lessThanLockId=sortedSet.headSet(lockID);
     63             if(!lessThanLockId.isEmpty()){
     64                 String prevLockID=lessThanLockId.last();//拿到比当前LOCKID这个几点更小的上一个节点
     65                 zooKeeper.exists(prevLockID,new LockWatcher(countDownLatch));//监控是否有删除节点的操作(释放锁)
     66                 countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);//等待锁释放(会话超时时间)
     67                 //上面这段代码意味着如果会话超时或者节点被删除(释放)了
     68                 System.out.println(Thread.currentThread().getName()+" 成功获取锁:["+lockID+"]");
     69             }
     70             return true;
     71         } catch (KeeperException e) {
     72             e.printStackTrace();
     73         } catch (InterruptedException e) {
     74             e.printStackTrace();
     75         }
     76         return false;
     77     }
     78     //释放锁
     79     public boolean unlock(){
     80         System.out.println(Thread.currentThread().getName()
     81                 +"->开始释放锁:["+lockID+"]");
     82         try {
     83             zooKeeper.delete(lockID,-1);//删除当前节点(释放锁)
     84             System.out.println("节点["+lockID+"]成功被删除");
     85             return true;
     86         } catch (InterruptedException e) {
     87             e.printStackTrace();
     88         } catch (KeeperException e) {
     89             e.printStackTrace();
     90         }
     91         return false;
     92     }
     93 
     94 
     95     public static void main(String[] args) {
     96         final CountDownLatch latch=new CountDownLatch(10);
     97         Random random=new Random();
     98         for(int i=0;i<10;i++){
     99             new Thread(()->{
    100                 DistributeLock lock=null;
    101                 try {
    102                     lock=new DistributeLock();//会话连接
    103                     latch.countDown();//减一
    104                     latch.await();//等待
    105                     lock.lock();//获取锁
    106                     Thread.sleep(random.nextInt(500));//睡眠
    107                 } catch (IOException e) {
    108                     e.printStackTrace();
    109                 } catch (InterruptedException e) {
    110                     e.printStackTrace();
    111                 }finally {
    112                     if(lock!=null){
    113                         lock.unlock();//释放锁
    114                     }
    115                 }
    116             }).start();//启动线程
    117         }
    118     }
    119 }
    View Code

    使用zookeeper实现分布式锁中:当所有请求(线程)去竞争某一资源时,通过创建的节点是否是最小节点来实现锁功能,如果当前线程创建的是最小节点(说明自己是第一个创建的节点),那么代表自己获取了锁,如果当前线程创建的不是最小节点,那么通过监听去了解上一级比自己小的节点是否有删除动作(锁释放),如果监听到上一级比自己小的节点有删除节点操作,那么自己就能够获取到锁。通过api方式实现能够在demo中跑起来,但是用ab并发测试,会报错,可以使用zookeeper客户端Curator中的InterProcessMutex来实现锁功能,并发测试不报错。

  • 相关阅读:
    Mysql查询一段时间记录
    exosip2 的简单使用
    oSIP协议栈浅析
    Oracle 11g New 与分区和存储相关的增强功能
    手动配置S2SH三大框架报错(四)
    手动配置S2SH三大框架报错(三)
    手动配置S2SH三大框架报错(二)
    手动配置S2SH三大框架报错(一)
    链表基本操作的实现
    SVN上传项目报错
  • 原文地址:https://www.cnblogs.com/LJing21/p/10547811.html
Copyright © 2011-2022 走看看