1、引入maven包
<dependencies>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
2、创建zookeeper分布式锁
import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class DisClient { final String disLock = "/distrilock"; String beforeNodePath; String currentNodePath; CountDownLatch countDownLatch = null; String threadNname; //获取到zkClient private ZkClient zkClient = new ZkClient("192.168.3.120:2181"); // 把抢锁过程为量部分,一部分是创建节点,比较序号,另一部分是等待锁 // 完整获取锁方法 public DisClient() { threadNname = Thread.currentThread().getName(); synchronized (DisClient.class) { if (!zkClient.exists(disLock)) { zkClient.createPersistent(disLock); } } } public void deleteLock() { if (zkClient != null && currentNodePath != null && currentNodePath.equals("") == false) { zkClient.delete(currentNodePath); System.out.println(threadNname + "释放锁"); } else { System.out.println(threadNname + "释放锁,失败" + currentNodePath); } } //这个方法就是抢锁的入口,需要不停迭代使用 public void getDisLock() { //第一步:肯定是去创建临时节点,如果创建后的节点的第一个节点就是自己, // 说明锁就抢到了 if (tryGetDisLock()) { //说明获取到锁 System.out.println(threadNname + ":获取到了锁,成功"); } else { System.out.println(threadNname + ": 获取到了锁,失败,等待"); waitForLock(); //递归获取锁 getDisLock(); } } // 尝试获取锁 public Boolean tryGetDisLock() { //判断节点释放创建过,如果第一次调用,则去创建 if (currentNodePath == null || currentNodePath.equals("")) { // 创建临时节点成功,返回 节点的路径 currentNodePath = zkClient.createEphemeralSequential(disLock + "/", "lock"); } //获取到 /distrilock下面所有的子节点 final List childs = zkClient.getChildren(disLock); //对节点信息进行排序 Collections.sort(childs);//默认是升序 String minNode = childs.get(0).toString(); //如果此时自己是最早的节点,就抢到锁了,可以执行自己的业务逻辑了 if (currentNodePath.equals(disLock + "/" + minNode)) { return true; } else { //说明最小节点不是自己创建的,要监控自己当前节点序号前一个节点 final int i = Collections.binarySearch(childs, currentNodePath.substring((disLock + "/").length())); beforeNodePath = disLock + "/" + childs.get(i - 1); } return false; } /** * 等待之前节点释放锁,如何判断锁被释放, * 需要唤醒线程继续尝试 tryGetDisLock */ public void waitForLock() { //注册一个监听器 IZkDataListener izk = new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { } public void handleDataDeleted(String s) throws Exception { System.out.println(threadNname + " 监控到了," + beforeNodePath + "节点发生变化了"); countDownLatch.countDown(); //把值减1变为0,唤醒之前await线程 } }; // 监控前一个节点 zkClient.subscribeDataChanges(beforeNodePath, izk); //在监听的通知没来之前,该线程应该处于等待 if (zkClient.exists(beforeNodePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); //阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforeNodePath, izk); } }
3、创建测试类
public class DisLockTest { public static void main(String[] args) { //使用3个线程模拟分布式环境 for (int i = 0; i < 10; i++) { new Thread(new DisLockRunnable()).start(); } } static class DisLockRunnable implements Runnable { public void run() { //每个线程就是去抢锁 final DisClient client = new DisClient(); client.getDisLock(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } client.deleteLock(); } } }