一、使用场景
在分布式应用,往往存在多个进程提供同一服务。这些进程有可能在相同的机器上,也有可能分布在不同的机器上。 如果这些进程共享了一些资源,可能就需要分布式锁来锁定对这些资源的访问。
二、实现分布式锁结构图
三、代码实现
1 package com.xbq.zookeeper.javaApi; 2 3 import java.util.Collections; 4 import java.util.List; 5 import java.util.concurrent.CountDownLatch; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Semaphore; 9 import java.util.concurrent.TimeUnit; 10 11 import org.apache.zookeeper.CreateMode; 12 import org.apache.zookeeper.KeeperException; 13 import org.apache.zookeeper.WatchedEvent; 14 import org.apache.zookeeper.Watcher; 15 import org.apache.zookeeper.ZooDefs.Ids; 16 import org.apache.zookeeper.ZooKeeper; 17 import org.apache.zookeeper.data.Stat; 18 19 /** 20 * 使用Zookeeper原生API实现分布式锁 21 * @author xbq 22 */ 23 public class ZookeeperLock implements Watcher{ 24 25 // 声明zk对象 26 private ZooKeeper zk = null; 27 // 此demo使用的集群,所以有多个ip和端口 28 private static String CONNECT_SERVER = "192.168.242.130:2181,192.168.242.130:2182,192.168.242.130:2183"; 29 // session过期时间 30 private static int SESSION_TIMEOUT = 3000; 31 // 根节点 32 private String root = "/locks"; 33 // 当前等待的节点 34 private String waitNode; 35 // 等待的时间 36 private int waitTime; 37 // 锁节点 38 private String myzkNode; 39 // 计数器 40 private CountDownLatch latch; 41 42 /** 43 * 构造函数 初始化 44 */ 45 public ZookeeperLock(){ 46 try { 47 zk = new ZooKeeper(CONNECT_SERVER, SESSION_TIMEOUT, this); 48 // 判断是否存在根节点,不需要监听根节点 49 Stat stat = zk.exists(root, false); 50 // 为空,说明不存在 51 if(stat == null){ 52 // 添加一个永久节点,即添加一个根节点 53 zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 54 } 55 } catch (Exception e) { 56 e.printStackTrace(); 57 } 58 } 59 60 /** 61 * 尝试获取锁 62 * @return 63 */ 64 private boolean tryLock(){ 65 String splitStr = "lock_"; // 格式 lock_000000001 66 try { 67 // 创建一个临时有序节点,并赋值给 myzkNode 68 myzkNode = zk.create(root + "/" + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 69 // 获取根节点下的所有子节点 70 List<String> children = zk.getChildren(root, false); 71 // 对子节点 排序 72 Collections.sort(children); 73 // 如果刚刚创建的节点 等于 获取最小的一个子节点,则说明 获取到锁 74 if(myzkNode.equals(root + "/" + children.get(0))){ 75 return true; 76 } 77 // 如果刚刚创建的节点 不等于 获取到的最小的一个子节点,则 监控比自己小的一个节点 78 // 获取刚刚建立的子节点(不包含根节点的子节点) 79 String childNode = myzkNode.substring(myzkNode.lastIndexOf("/") + 1); 80 // 获取比自己小的节点 81 waitNode = children.get(Collections.binarySearch(children, childNode) - 1); 82 } catch (KeeperException e) { 83 e.printStackTrace(); 84 } catch (InterruptedException e) { 85 e.printStackTrace(); 86 } 87 return false; 88 } 89 90 /** 91 * 等待锁释放 92 * @param waitNode 93 * @param waidTime 94 * @return 95 * @throws KeeperException 96 * @throws InterruptedException 97 */ 98 private boolean waitLock(String waitNode, int waidTime) throws KeeperException, InterruptedException{ 99 // 判断比自己小的节点是否存在,并添加监听 100 Stat stat = zk.exists(root + "/" + waitNode, true); 101 // 如果存在 比自己小的节点 102 if(stat != null){ 103 this.latch = new CountDownLatch(1); 104 this.latch.await(waidTime, TimeUnit.MILLISECONDS); 105 this.latch = null; 106 } 107 return true; 108 } 109 110 /** 111 * 获取锁 112 */ 113 public void lock(){ 114 // 如果尝试获取锁成功 115 if(tryLock()){ 116 // 获取 成功 117 System.out.println("Thread" + Thread.currentThread().getName() + " -- hold lock!"); 118 return; 119 } 120 // 等待并获取锁 121 try { 122 waitLock(waitNode, waitTime); 123 } catch (KeeperException e) { 124 e.printStackTrace(); 125 } catch (InterruptedException e) { 126 e.printStackTrace(); 127 } 128 } 129 130 /** 131 * 解锁 132 */ 133 public void unLock(){ 134 try { 135 zk.delete(myzkNode, -1); 136 zk.close(); 137 System.out.println("Thread" + Thread.currentThread().getName() +" unlock success! 锁节点:" + myzkNode); 138 } catch (InterruptedException e) { 139 e.printStackTrace(); 140 } catch (KeeperException e) { 141 e.printStackTrace(); 142 } 143 } 144 145 /** 146 * 删除的时候 触发事件 147 */ 148 @Override 149 public void process(WatchedEvent event) { 150 // 如果计数器不为空的话,释放计数器锁 151 if(this.latch != null){ 152 this.latch.countDown(); 153 } 154 } 155 156 /** 157 * 测试 158 * @param args 159 */ 160 public static void main(String[] args) { 161 // 定义线程池 162 ExecutorService service = Executors.newCachedThreadPool(); 163 // 只能10个线程同时运行,即模拟并发数为10 164 final Semaphore semaphore = new Semaphore(10); 165 // 10个客户端连接 166 for(int i=0;i<10;i++){ 167 Runnable runnable = new Runnable() { 168 @Override 169 public void run() { 170 try { 171 semaphore.acquire(); 172 ZookeeperLock zkLock = new ZookeeperLock(); 173 zkLock.lock(); 174 // 业务操作代码 175 Thread.sleep(3000); 176 zkLock.unLock(); 177 semaphore.release(); 178 } catch (Exception e) { 179 e.printStackTrace(); 180 } 181 } 182 }; 183 service.execute(runnable); 184 } 185 service.shutdown(); 186 } 187 }