* 实现思路:
* 使用Zookeeper最小节点的方式
* 执行过程:
* 1、创建根节点,在根节点下创建顺序节点
* 2、如当前创建的节点为根节点的所有子节点中最小的,则获取锁成功;
* 否则,找到当前节点的前一个节点,watch前一个节点,当前一个节点被删除时获得锁;另外,等待超时也不能获得锁
代码能跑通,但还要改,先记一下:
1.先创建一个抽象类实现Lock接口:
package com...zookeeper.zkLock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @author com. * @date */ public abstract class AbstractLock implements Lock { /** * 获取锁的重试次数 */ private static final int RE_TRY = 10; @Override public void lock() { int count = RE_TRY; while (!this.tryLock() && count > 0) { count--; } } @Override public void lockInterruptibly() throws InterruptedException { this.lock(); } @Override public Condition newCondition() { return null; } }
2.具体实现代码:
package com..zookeeper.zkLock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author com. * @date */ public class ZookeeperLock extends AbstractLock { private ZooKeeper zooKeeper; /** * 锁根节点 */ private final String lockNamespace; /** * 锁值节点 */ private final String lockKey; /** * 当前节点 */ private String currentNode; /** * 等待的前一个节点 */ private String waitNode; /** * 竞争的节点列表 */ private List<String> lockNodes; /** * 计数器 */ private volatile CountDownLatch countDownLatch; /** * 是否持有锁 */ private volatile boolean locked = false; public ZookeeperLock(String address, int timeout, String lockNamespace, String lockKey) { this.init(address, timeout); this.lockNamespace = lockNamespace; this.lockKey = lockKey; } private void init(String address, int timeout) { try { zooKeeper = new ZooKeeper(address, timeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("事件类型为:" + event.getType()); System.out.println("事件发生的路径:" + event.getPath()); System.out.println("通知状态为:" +event.getState()); } }); } catch (Exception e) { System.out.println(e.getMessage()+ e); throw new RuntimeException(e.getMessage(), e); } } @Override public boolean tryLock() { while (true) { String lock = lockNamespace + "/" + lockKey; try { // 确保zookeeper连接成功 this.ensureZookeeperConnect(); // 确保根节点存在 ensureNameSpaceExist(lockNamespace); // 创建临时顺序节点,节点目录为/lockNamespace/lockKey_xxx,节点为lockKey_xxx currentNode = zooKeeper.create(lock, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL).replace(lockNamespace + "/", ""); // 取出所有子节点 List<String> childrenList = zooKeeper.getChildren(lockNamespace, false); // 竞争的节点列表 lockNodes = new ArrayList<>(); for (String children : childrenList) { if (children.startsWith(lockKey)) { lockNodes.add(children); } } // 排序 Collections.sort(lockNodes); // System.out.println("排序后的所有子节点--->:" + lockNodes); // 如当前节点为最小节点,则成功获取锁 if (currentNode.equals(lockNodes.get(0))) { locked = true; } System.out.println(Thread.currentThread().getName() + " "+currentNode + " 比较 " + lockNodes.get(0)+ " 为 "+locked + " 创建了临时节点"); return locked; } catch (InterruptedException | KeeperException e) { System.out.println(Thread.currentThread().getName() + " 获得锁异常"); System.out.println(e.getMessage() + e); throw new RuntimeException(e); } } } @Override public void unlock() { try { zooKeeper.delete(lockNamespace + "/" + currentNode, -1); zooKeeper.close(); locked = false; System.out.println(Thread.currentThread().getName() + " 释放锁~~~"); } catch (InterruptedException | KeeperException e) { System.out.println(e.getMessage()+e); } } /** * Zookeeper分布式锁 * 实现思路: * 使用Zookeeper最小节点的方式 * 执行过程: * 1、创建根节点,在根节点下创建顺序节点 * 2、如当前创建的节点为根节点的所有子节点中最小的,则获取锁成功; * 否则,找到当前节点的前一个节点,watch前一个节点,当前一个节点被删除时获得锁;另外,等待超时也不能获得锁 */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //等待锁 try { if (tryLock()) { System.out.println(Thread.currentThread().getName() + " 获得锁~~~"); return locked; } System.out.println(Thread.currentThread().getName() + " 等待锁~~~"); //找到当前节点的前一个节点 waitNode = lockNodes.get(Collections.binarySearch(lockNodes, currentNode) - 1); this.waitLock(time, unit); return locked; } catch (KeeperException e) { System.out.println(e.getMessage()+ e); throw new RuntimeException(e); } } /** * 等待锁 */ private void waitLock(long time, TimeUnit unit) throws KeeperException, InterruptedException { String waitLock = lockNamespace + "/" + waitNode; System.out.println(Thread.currentThread().getName() +" 等待锁 {} 释放 " + waitLock); Stat stat = zooKeeper.exists(waitLock, watchedEvent -> { if (countDownLatch != null) { locked = true; countDownLatch.countDown(); } }); // 前一个节点此刻存在,等待,节点消失则成功获取锁 if (stat != null) { countDownLatch = new CountDownLatch(1); countDownLatch.await(time, unit); countDownLatch = null; } else { // 前一个节点此刻不存在,获得锁 locked = true; } } /** * 确保根节点存在 */ private void ensureNameSpaceExist(String lockNamespace) throws KeeperException, InterruptedException { Stat statS = zooKeeper.exists(lockNamespace, false); if (statS == null) { //如果根节点不存在,创建 zooKeeper.create(lockNamespace, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } /** * 确保zookeeper连接成功,防止出现连接还未完成就执行zookeeper的get/create/exsit操作出现错误KeeperErrorCode = ConnectionLoss */ private void ensureZookeeperConnect() throws InterruptedException { CountDownLatch connectedLatch = new CountDownLatch(1); zooKeeper.register(watchedEvent -> { if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) { connectedLatch.countDown(); } }); // zookeeper连接中则等待 if (ZooKeeper.States.CONNECTING == zooKeeper.getState()) { connectedLatch.await(); } } }
3.测试类:
package com..zookeeper.zkLock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import wfc.service.database.RecordSet; import wfc.service.database.SQL; import java.io.IOException; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; public class Main { private static final int Thread_num = 5; private static final CyclicBarrier cb = new CyclicBarrier(Thread_num); public static void main(String[] args) throws Exception { Thread [] threads = new Thread[Thread_num]; for(int i=0;i<Thread_num;i++){ Thread thread = new Thread(new Runnable() { @Override public void run() { try { cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } Lock lock = new ZookeeperLock("localhost:2181",5000,"/zkLock","lockKey"); try { if(lock.tryLock(1000, TimeUnit.MILLISECONDS)){ System.out.println(Thread.currentThread().getName() + " 获得锁执行业务"); //减数据库 int count = 1; String st_id = "keys"; String updateSql = "update dang_fj set count = count-? where st_id = ? and count>=1"; Object[] updateObject = new Object[] {count,st_id}; RecordSet updateRs = SQL.execute(updateSql,updateObject); int number = updateRs.TOTAL_RECORD_COUNT; //影响行数 System.out.println("数据库影响行数:"+number); } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }); threads [i] = thread; thread.start(); } for(Thread thread : threads){ thread.join(); } System.out.println("执行结束======"); }