原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/11605319.html
Zookeeper是一种提供“分布式服务协调“的中心化服务,分布式应用程序才可以基于Zookeeper的以下两个特性实现分布式锁功能。
- 顺序临时节点:Zookeeper提供一个多层级的节点命名空间(节点称为Znode),每个节点都用一个以斜杠(/)分隔的路径来表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),每个节点还能被标记为有序性(SEQUENTIAL),一旦节点被标记为有序性,那么整个节点就具有顺序自增的特点。一般可以组合这几类节点来创建所需要的节点,例如,创建一个持久节点作为父节点,在父节点下面创建临时节点,并标记该临时节点为有序性。
- Watch机制:Zookeeper还提供了另外一个重要的特性,Watcher(事件监听器)。ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知给用户。
熟悉了Zookeeper的这两个特性之后,就可以看看Zookeeper是如何实现分布式锁的了。
首先,需要建立一个父节点,节点类型为持久节点(PERSISTENT) ,每当需要访问共享资源时,就会在父节点下建立相应的顺序子节点,节点类型为临时节点(EPHEMERAL),且标记为有序性(SEQUENTIAL),并且以临时节点名称+父节点名称+顺序号组成特定的名字。
在建立子节点后,对父节点下面的所有以临时节点名称name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,如果是最小节点,则获得锁。
如果不是最小节点,则阻塞等待锁,并且获得该节点的上一顺序节点,为其注册监听事件,等待节点对应的操作获得锁。
当调用完共享资源后,删除该节点,关闭zk,进而可以触发监听事件,释放该锁。
以上实现的分布式锁是严格按照顺序访问的并发锁。一般还可以直接引用Curator框架来实现Zookeeper分布式锁,代码如下:
Maven Dependency
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
DistributedLock.java
1 package org.fool.spring.util; 2 3 import java.util.concurrent.TimeUnit; 4 5 public interface DistributedLock { 6 7 void lock() throws Exception; 8 9 boolean tryLock(long time, TimeUnit unit) throws Exception; 10 11 void unlock() throws Exception; 12 13 boolean isAcquiredInThisProcess(); 14 }
ZkDistributedLock.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 5 6 import java.util.concurrent.TimeUnit; 7 8 public class ZkDistributedLock implements DistributedLock { 9 10 private InterProcessMutex mutex; 11 12 ZkDistributedLock(ZkClient zkClient, String lockPath) { 13 CuratorFramework client = zkClient.getClient(); 14 this.mutex = new InterProcessMutex(client, lockPath); 15 } 16 17 @Override 18 public void lock() throws Exception { 19 this.mutex.acquire(); 20 } 21 22 @Override 23 public boolean tryLock(long time, TimeUnit unit) throws Exception { 24 return this.mutex.acquire(time, unit); 25 } 26 27 @Override 28 public void unlock() throws Exception { 29 this.mutex.release(); 30 } 31 32 @Override 33 public boolean isAcquiredInThisProcess() { 34 return this.mutex.isAcquiredInThisProcess(); 35 } 36 37 }
ZkClient.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.framework.CuratorFramework; 4 5 public class ZkClient { 6 7 private final CuratorFramework client; 8 9 ZkClient(CuratorFramework client) { 10 this.client = client; 11 } 12 13 CuratorFramework getClient() { 14 return this.client; 15 } 16 17 /** 18 * start the client 19 */ 20 public void start() { 21 this.client.start(); 22 } 23 24 /** 25 * close the client 26 */ 27 public void close() { 28 this.client.close(); 29 } 30 31 }
DistributedLocks.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.RetryPolicy; 4 import org.apache.curator.framework.CuratorFramework; 5 import org.apache.curator.framework.CuratorFrameworkFactory; 6 import org.apache.curator.retry.ExponentialBackoffRetry; 7 8 public final class DistributedLocks { 9 10 private DistributedLocks() { 11 } 12 13 private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000; 14 private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000; 15 private static final int BASE_SLEEP_TIME_MS = 1000; 16 private static final int MAX_RETRIES = 3; 17 18 /** 19 * Define the default retry policy 20 */ 21 private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES); 22 23 /** 24 * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout. 25 */ 26 public static ZkClient newZkClient(String connectString) { 27 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS, 28 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY); 29 return new ZkClient(client); 30 } 31 32 /** 33 * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout 34 */ 35 public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) { 36 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, 37 connectionTimeoutMs, DEFAULT_RETRY_POLICY); 38 return new ZkClient(client); 39 } 40 41 /** 42 * Create a new DistributedLock with ZkClient and lock path. 43 */ 44 public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) { 45 return new ZkDistributedLock(zkClient, lockPath); 46 } 47 }
TestZkDistributedLock.java
1 package org.fool.spring.test; 2 3 import org.fool.spring.util.DistributedLock; 4 import org.fool.spring.util.DistributedLocks; 5 import org.fool.spring.util.ZkClient; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import java.util.concurrent.TimeUnit; 10 11 public class TestZkDistributedLock { 12 13 private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class); 14 15 private static final String lockPath = "/curator/test"; 16 17 public static void main(String[] args) throws Exception { 18 ZkClient zkClient = DistributedLocks.newZkClient("127.0.0.1:2181"); 19 zkClient.start(); 20 21 DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath); 22 23 boolean isAcquired = lock.isAcquiredInThisProcess(); 24 logger.info("==========lock acquired: " + isAcquired + "=========="); 25 26 if (lock.tryLock(3, TimeUnit.SECONDS)) { 27 try { 28 isAcquired = lock.isAcquiredInThisProcess(); 29 logger.info("==========lock acquired: " + isAcquired + "=========="); 30 31 // mock to do business logic 32 Thread.sleep(60000); 33 } finally { 34 lock.unlock(); 35 logger.info("==========release the lock !!!=========="); 36 } 37 } else { 38 logger.info("==========failed to get the lock !!!=========="); 39 } 40 41 zkClient.close(); 42 } 43 }
Test
执行TestZkDistributedLock,模拟业务执行占用60s时间
在60s内,再次执行TestZkDistributedLock,可以看到尝试获取锁失败
打开zk client,查看执行期间内的顺序临时节点的变化情况
Summary
Zookeeper实现的分布式锁
优点
- Zookeeper是集群实现,可以避免单点问题,且能保证每次操作都可以有效地释放锁,这是因为一旦应用服务挂掉了,临时节点会因为session连接断开而自动删除掉。
缺点
- 由于频繁地创建和删除结点,加上大量的Watch事件,对Zookeeper集群来说,压力非常大。且从性能上来说,与Redis实现的分布式锁相比,还是存在一定的差距。