zk实现分布式锁纵观网络各种各样的帖子层出不穷,笔者查阅很多资料发现一个问题,有些文章只写原理并没有具体实现,有些文章虽然写了实现但是并不全面
借这个周末给大家做一个总结,代码拿来就可以用并且每一种实现都经过了测试没有bug。下面我们先从最简单的实现开始介绍:
- 简单的实现
package com.srr.lock; /** * @Description 分布式锁的接口 */ abstract public interface DistributedLock { /** * 获取锁 */ boolean lock(); /** * 解锁 */ void unlock(); abstract boolean readLock(); abstract boolean writeLock(); } package com.srr.lock; /** * 简单的zk分布式做实现策略 * 性能比较低会导致羊群效应 */ public abstract class SimplerZKLockStrategy implements DistributedLock{ /** * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现 * @throws Exception */ @Override public boolean lock() { //获取锁成功 if (tryLock()){ System.out.println(Thread.currentThread().getName()+"获取锁成功"); return true; }else{ //获取锁失败 //阻塞一直等待 waitLock(); //递归,再次获取锁 return lock(); } } /** * 尝试获取锁,子类实现 */ protected abstract boolean tryLock() ; /** * 等待获取锁,子类实现 */ protected abstract void waitLock(); /** * 解锁:删除key */ @Override public abstract void unlock(); } package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.concurrent.CountDownLatch; /** * 分布式锁简单实现 */ public class SimpleZKLock extends SimplerZKLockStrategy{ private static final String PATH = "/lowPerformance_zklock"; private CountDownLatch countDownLatch = null; //zk地址和端口 public static final String ZK_ADDR = "192.168.32.129:2181"; //创建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR); @Override protected boolean tryLock() { //如果不存在这个节点,则创建持久节点 try{ zkClient.createEphemeral(PATH, "lock"); return true; }catch (Exception e){ return false; } } @Override protected void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch){ countDownLatch.countDown(); } System.out.println("listen lock unlock"); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //监听前一个节点的变化 zkClient.subscribeDataChanges(PATH, lIZkDataListener); if (zkClient.exists(PATH)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(PATH, lIZkDataListener); } @Override public void unlock() { if (null != zkClient) { System.out.println("lock unclock"); zkClient.delete(PATH); } } @Override public boolean readLock() { return true; } @Override public boolean writeLock() { return true; } } package com.srr.lock; import redis.clients.jedis.Jedis; import java.util.concurrent.CountDownLatch; /** * 测试场景 * count从1加到4 * 使用简单的分布式锁在分布式环境下保证结果正确 */ public class T { volatile int count = 1; public void inc(){ for(int i = 0;i<3;i++){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } count++; System.out.println("count == "+count); } } public int getCount(){ return count; } public static void main(String[] args) throws InterruptedException { final T t = new T(); final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(5); for(int i = 0;i<5;i++){ new Thread(new Runnable() { @Override public void run() { DistributedLock distributedLock = new SimpleZKLock(); if(lock.lock(distributedLock)){ t.inc(); lock.unlock(distributedLock); countDownLatch.countDown(); } System.out.println("count == "+t.getCount()); } }).start(); } countDownLatch.await(); } }
运行结果:
这种方式实现虽然简单,但是会引发羊群效应,因为每个等待锁的客户端都需要注册监听lock节点的删除事件,如果客户端并发请求很多,那么这将会非常消耗zookeeper集群
的资源,严重的化则会导致zookeeper集群宕机也不是没有可能。
- 高性能实现,解决羊群效应问题
package com.srr.lock; /** * @Description 分布式锁的接口 */ abstract public interface DistributedLock { /** * 获取锁 */ boolean lock(); /** * 解锁 */ void unlock(); abstract boolean readLock(); abstract boolean writeLock(); } package com.srr.lock; public abstract class BlockingZKLockStrategy implements DistributedLock{ /** * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现 * @throws Exception */ @Override public final boolean lock() { //获取锁成功 if (tryLock()){ System.out.println(Thread.currentThread().getName()+"获取锁成功"); return true; }else{ //获取锁失败 //阻塞一直等待 waitLock(); //递归,再次获取锁 return true; } } /** * 尝试获取锁,子类实现 */ protected abstract boolean tryLock() ; /** * 等待获取锁,子类实现 */ protected abstract void waitLock(); /** * 解锁:删除key */ @Override public abstract void unlock(); } package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; public class BlockingZKLock extends BlockingZKLockStrategy{ private static final String PATH = "/highPerformance_zklock"; //当前节点路径 private String currentPath; //前一个节点的路径 private String beforePath; private CountDownLatch countDownLatch = null; //zk地址和端口 public static final String ZK_ADDR = "192.168.32.129:2181"; //超时时间 public static final int SESSION_TIMEOUT = 30000; //创建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT); public BlockingZKLock() { //如果不存在这个节点,则创建持久节点 if (!zkClient.exists(PATH)) { zkClient.createPersistent(PATH); } } @Override protected boolean tryLock() { //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath //if (null == currentPath || "".equals(currentPath)) { //在path下创建一个临时的顺序节点 currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock"); //} try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //获取所有的临时节点,并排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) { return true; }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); beforePath = PATH+"/"+childrens.get(wz-1); } return false; } @Override protected void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch){ countDownLatch.countDown(); } System.out.println("listen lock unlock"); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //监听前一个节点的变化 zkClient.subscribeDataChanges(beforePath, lIZkDataListener); if (zkClient.exists(beforePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener); } @Override public void unlock() { if (null != zkClient) { System.out.println("lock unclock"); zkClient.delete(currentPath); } } @Override public boolean readLock() { return true; } @Override public boolean writeLock() { return true; } } package com.srr.lock; import java.util.concurrent.CountDownLatch; /** * 测试场景 * count从1加到4 * 使用高性能的分布式锁在分布式环境下保证结果正确 */ public class T { volatile int count = 1; public void inc(){ for(int i = 0;i<3;i++){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } count++; System.out.println("count == "+count); } } public int getCount(){ return count; } public static void main(String[] args) throws InterruptedException { final T t = new T(); final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(5); for(int i = 0;i<5;i++){ new Thread(new Runnable() { @Override public void run() { DistributedLock distributedLock = new BlockingZKLock(); if(lock.lock(distributedLock)){ t.inc(); lock.unlock(distributedLock); countDownLatch.countDown(); } System.out.println("count == "+t.getCount()); } }).start(); } countDownLatch.await(); } }
这种实现客户端只需监听它前一个节点的变化,不需要监听所有的节点,从而提高了zookeeper锁的性能。
- 共享锁(S锁)
- 写到这个,看了网络上很多错误的文章实现把排它锁当做共享锁
共享锁正确是实现姿势如下:
package com.srr.lock; /** * @Description 分布式锁的接口 */ abstract public interface DistributedLock { /** * 获取锁 */ boolean lock(); /** * 解锁 */ void unlock(); abstract boolean readLock(); abstract boolean writeLock(); } package com.srr.lock; /** * 共享锁策略 */ abstract public class ZKSharedLockStrategy implements DistributedLock{ @Override public boolean readLock() { //获取锁成功 if (tryReadLock()){ System.out.println(Thread.currentThread().getName()+"获取读锁成功"); return true; }else{ //获取锁失败 //阻塞一直等待 waitLock(); //递归,再次获取锁 return true; } } @Override public boolean writeLock() { //获取锁成功 if (tryWriteLock()){ System.out.println(Thread.currentThread().getName()+"获取写锁成功"); return true; }else{ //获取锁失败 //阻塞一直等待 waitLock(); //递归,再次获取锁 return true; } } /** * 尝试获取锁,子类实现 */ protected abstract boolean tryWriteLock() ; /** * 尝试获取锁,子类实现 */ protected abstract boolean tryReadLock() ; /** * 等待获取锁,子类实现 */ protected abstract void waitLock(); /** * 解锁:删除key */ @Override public abstract void unlock(); } package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 共享锁 */ public class ZKSharedLock extends ZKSharedLockStrategy{ private static final String PATH = "/zk-root-readwrite-lock"; //当前节点路径 private String currentPath; //前一个节点的路径 private String beforePath; private CountDownLatch countDownLatch = null; //zk地址和端口 public static final String ZK_ADDR = "192.168.32.129:2181"; //超时时间 public static final int SESSION_TIMEOUT = 30000; //创建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT); public ZKSharedLock() { //如果不存在这个节点,则创建持久节点 if (!zkClient.exists(PATH)) { zkClient.createPersistent(PATH); } } @Override protected boolean tryWriteLock() { //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath if (null == currentPath || "".equals(currentPath)) { //在path下创建一个临时的顺序节点 currentPath = zkClient.createEphemeralSequential(PATH+"/w", "writelock"); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //获取所有的临时节点,并排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) { return true; }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); beforePath = PATH+"/"+childrens.get(wz-1); } return false; } @Override protected boolean tryReadLock() { //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath if (null == currentPath || "".equals(currentPath)) { //在path下创建一个临时的顺序节点 currentPath = zkClient.createEphemeralSequential(PATH+"/r", "readklock"); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //获取所有的临时节点,并排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) { return true; }else if(isAllReadNodes(childrens)){ return true; }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); for (int i = wz - 1; i > 0; i--) { // 找到了离得最近的一个写节点,那么它的后一个节点要么是一个读节点,要么就是待加锁的节点本身 if (childrens.get(i).indexOf("w") >= 0) { beforePath = PATH + "/" + childrens.get(i); break; } } } return false; } // 判断比自已小的节点是否都是读节点 private boolean isAllReadNodes(List<String> sortNodes) { int pathLength = PATH.length(); int currentIndex = Collections.binarySearch(sortNodes, currentPath.substring(pathLength+1)); for (int i = 0; i < currentIndex - 1; i++) { // 只要有一个写锁,则不能直接获取读锁 if (sortNodes.get(i).indexOf("w") >= 0) { return false; } } return true; } @Override protected void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch){ countDownLatch.countDown(); } System.out.println("listen lock unlock"); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //监听前一个节点的变化 zkClient.subscribeDataChanges(beforePath, lIZkDataListener); if (zkClient.exists(beforePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener); } @Override public boolean lock() { return false; } @Override public void unlock() { if (null != zkClient) { System.out.println("lock unclock"); zkClient.delete(currentPath); zkClient.close(); } } } package com.srr.lock; /** * 锁工具类 */ public class Lock { /** * 获取锁 */ boolean lock(DistributedLock lock) { return lock.lock(); }; /** * 获取读锁 */ boolean readlock(DistributedLock lock) { return lock.readLock(); }; /** * 获取读锁 */ boolean writeLock(DistributedLock lock) { return lock.writeLock(); }; /** * 释放锁 */ void unlock(DistributedLock lock) { lock.unlock(); }; } package com.srr.lock; import java.util.concurrent.CountDownLatch; /** * 测试共享锁 */ public class SharedLockTest { private static volatile int count = 0; public static void main(String[] args) throws Exception { final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(10); new Thread(new Runnable() { @Override public void run() { testWriteLock(8); } }).start(); new Thread(new Runnable() { @Override public void run() { testReadLock(10); } }).start(); new Thread(new Runnable() { @Override public void run() { testReadLock(20); } }).start(); new Thread(new Runnable() { @Override public void run() { testWriteLock(11); } }).start(); new Thread(new Runnable() { @Override public void run() { testWriteLock(30); } }).start(); new Thread(new Runnable() { @Override public void run() { testReadLock(9); } }).start(); countDownLatch.await(); } // 读锁 private static void testReadLock(long sleepTime) { try { Lock lock = new Lock(); DistributedLock dlock = new ZKSharedLock(); lock.readlock(dlock); System.out.println("i get readlock ->" + sleepTime); System.out.println("count = "+ count); Thread.sleep(sleepTime); lock.unlock(dlock); } catch (Exception e) { e.printStackTrace(); } } // 写锁 private static void testWriteLock(long sleepTime) { try { Lock lock = new Lock(); DistributedLock dlock = new ZKSharedLock(); lock.writeLock(dlock); System.out.println("i get writelock ->" + sleepTime); count++; Thread.sleep(sleepTime); lock.unlock(dlock); } catch (Exception e) { e.printStackTrace(); } } }
运行结果:
从结果可以看出读锁和读锁可以共享锁,而写锁必须等待读锁或者写锁释放之后才能获取锁。
最后,zk分布式锁完美解决方案:
- Apache Curator
- Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
- Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.
网上很多文章竟然标题用Curator实现分布式锁,大哥Curator框架本身已经实现了分布式锁而且提供了各种各样的锁api供大家使用,我们不用再基于Curator实现分布式锁,这不是多此一举吗?这里给出一个简单的使用案例,旨在说明意图:
package com.srr.lock; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 测试场景 * count从1加到101 * 使用redis分布式锁在分布式环境下保证结果正确 */ public class CuratorDistributedLockTest { private static final String lockPath = "/curator_lock"; //zk地址和端口 public static final String zookeeperConnectionString = "192.168.32.129:2181"; volatile int count = 1; public void inc(){ for(int i = 0;i<10;i++){ count++; System.out.println("count == "+count); } } public int getCount(){ return count; } public static void main(String[] args) throws InterruptedException { final T t = new T(); final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(4); for(int i = 0;i<4;i++){ new Thread(new Runnable() { @Override public void run() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(10, 5000); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); InterProcessMutex lock = new InterProcessMutex(client, lockPath); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { try { System.out.println("get the lock"); t.inc(); } finally { lock.release(); System.out.println("unlock the lock"); } } }catch (Exception e){ e.printStackTrace(); } countDownLatch.countDown(); } }).start(); } countDownLatch.await(); System.out.println("total count == "+t.getCount()); } }
运行结果:
如果想更多了解Curator框架,请移步http://curator.apache.org/,官网给出了详细的使用案例及介绍。至此zk实现分布式锁总结完毕!
原创不易,请多多关注!