- 创建会话连接
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 package com.karat.cn.zookeeperAchieveLock.javaapilock; 2 3 import org.apache.zookeeper.WatchedEvent; 4 import org.apache.zookeeper.Watcher; 5 import org.apache.zookeeper.ZooKeeper; 6 7 import java.io.IOException; 8 import java.util.concurrent.CountDownLatch; 9 10 /** 11 * 创建会话 12 */ 13 public class ZookeeperClient { 14 15 private final static String CONNECTSTRING="47.107.121.215:2181"; 16 17 private static int sessionTimeout=5000; 18 19 //获取连接 20 public static ZooKeeper getInstance() throws IOException, InterruptedException { 21 final CountDownLatch conectStatus=new CountDownLatch(1); 22 ZooKeeper zooKeeper=new ZooKeeper(CONNECTSTRING, sessionTimeout, new Watcher() { 23 public void process(WatchedEvent event) { 24 if(event.getState()== Event.KeeperState.SyncConnected){//连接成功状态 25 conectStatus.countDown(); 26 } 27 } 28 }); 29 conectStatus.await();//等待 30 return zooKeeper; 31 } 32 33 public static int getSessionTimeout() { 34 return sessionTimeout; 35 } 36 }
- 临时节点删除监控
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 package com.karat.cn.zookeeperAchieveLock.javaapilock; 2 3 import org.apache.zookeeper.WatchedEvent; 4 import org.apache.zookeeper.Watcher; 5 6 import java.util.concurrent.CountDownLatch; 7 8 /** 9 *监控 10 */ 11 public class LockWatcher implements Watcher{ 12 13 private CountDownLatch latch; 14 15 public LockWatcher(CountDownLatch latch) { 16 this.latch = latch; 17 } 18 19 public void process(WatchedEvent event) { 20 if(event.getType()== Event.EventType.NodeDeleted){//当前节点是否删除 21 latch.countDown(); 22 } 23 } 24 }
- 上锁与释放锁
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 package com.karat.cn.zookeeperAchieveLock.javaapilock; 2 3 import org.apache.zookeeper.CreateMode; 4 import org.apache.zookeeper.KeeperException; 5 import org.apache.zookeeper.ZooDefs; 6 import org.apache.zookeeper.ZooKeeper; 7 8 import java.io.IOException; 9 import java.util.List; 10 import java.util.Random; 11 import java.util.SortedSet; 12 import java.util.TreeSet; 13 import java.util.concurrent.CountDownLatch; 14 import java.util.concurrent.TimeUnit; 15 16 /** 17 * 分布式锁的实现 18 */ 19 public class DistributeLock { 20 21 22 private static final String ROOT_LOCKS="/LOCKS";//根节点 23 24 private ZooKeeper zooKeeper;//zooKeeper实列 25 26 private int sessionTimeout; //会话超时时间 27 28 private String lockID; //记录锁节点id 29 30 private final static byte[] data={1,2}; //节点的数据 31 32 private CountDownLatch countDownLatch=new CountDownLatch(1);//计数器 33 34 //会话连接 35 public DistributeLock() throws IOException, InterruptedException { 36 this.zooKeeper=ZookeeperClient.getInstance(); 37 this.sessionTimeout=ZookeeperClient.getSessionTimeout(); 38 } 39 40 //获取锁的方法 41 public boolean lock(){ 42 try { 43 //创建一个临时有序节点 44 lockID=zooKeeper.create(ROOT_LOCKS+"/",data, ZooDefs.Ids. 45 OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 46 System.out.println(Thread.currentThread().getName() 47 +"->成功创建了lock节点["+lockID+"], 开始去竞争锁"); 48 //获取当前根节点下的所有子节点 49 List<String> childrenNodes=zooKeeper.getChildren(ROOT_LOCKS,true);//获取根节点下的所有子节点 50 //排序,从小到大(树节点) 51 SortedSet<String> sortedSet=new TreeSet<String>(); 52 for(String children:childrenNodes){ 53 sortedSet.add(ROOT_LOCKS+"/"+children); 54 } 55 String first=sortedSet.first(); //拿到最小的节点 56 if(lockID.equals(first)){//如果刚创建的临时节点就是最小节点,那么就没有其它子节点,当前新建节点获取锁成功 57 //表示当前就是最小的节点 58 System.out.println(Thread.currentThread().getName()+"->成功获得锁,lock节点为:["+lockID+"]"); 59 return true; 60 } 61 //当当前创建的临时节点不是最小节点时,说明之前已有创建的临时节点,之前临时节点正在使用锁,等待锁释放 62 SortedSet<String> lessThanLockId=sortedSet.headSet(lockID); 63 if(!lessThanLockId.isEmpty()){ 64 String prevLockID=lessThanLockId.last();//拿到比当前LOCKID这个几点更小的上一个节点 65 zooKeeper.exists(prevLockID,new LockWatcher(countDownLatch));//监控是否有删除节点的操作(释放锁) 66 countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);//等待锁释放(会话超时时间) 67 //上面这段代码意味着如果会话超时或者节点被删除(释放)了 68 System.out.println(Thread.currentThread().getName()+" 成功获取锁:["+lockID+"]"); 69 } 70 return true; 71 } catch (KeeperException e) { 72 e.printStackTrace(); 73 } catch (InterruptedException e) { 74 e.printStackTrace(); 75 } 76 return false; 77 } 78 //释放锁 79 public boolean unlock(){ 80 System.out.println(Thread.currentThread().getName() 81 +"->开始释放锁:["+lockID+"]"); 82 try { 83 zooKeeper.delete(lockID,-1);//删除当前节点(释放锁) 84 System.out.println("节点["+lockID+"]成功被删除"); 85 return true; 86 } catch (InterruptedException e) { 87 e.printStackTrace(); 88 } catch (KeeperException e) { 89 e.printStackTrace(); 90 } 91 return false; 92 } 93 94 95 public static void main(String[] args) { 96 final CountDownLatch latch=new CountDownLatch(10); 97 Random random=new Random(); 98 for(int i=0;i<10;i++){ 99 new Thread(()->{ 100 DistributeLock lock=null; 101 try { 102 lock=new DistributeLock();//会话连接 103 latch.countDown();//减一 104 latch.await();//等待 105 lock.lock();//获取锁 106 Thread.sleep(random.nextInt(500));//睡眠 107 } catch (IOException e) { 108 e.printStackTrace(); 109 } catch (InterruptedException e) { 110 e.printStackTrace(); 111 }finally { 112 if(lock!=null){ 113 lock.unlock();//释放锁 114 } 115 } 116 }).start();//启动线程 117 } 118 } 119 }
使用zookeeper实现分布式锁中:当所有请求(线程)去竞争某一资源时,通过创建的节点是否是最小节点来实现锁功能,如果当前线程创建的是最小节点(说明自己是第一个创建的节点),那么代表自己获取了锁,如果当前线程创建的不是最小节点,那么通过监听去了解上一级比自己小的节点是否有删除动作(锁释放),如果监听到上一级比自己小的节点有删除节点操作,那么自己就能够获取到锁。通过api方式实现能够在demo中跑起来,但是用ab并发测试,会报错,可以使用zookeeper客户端Curator中的InterProcessMutex来实现锁功能,并发测试不报错。