zoukankan      html  css  js  c++  java
  • Zookeeper分布式锁

    原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/11605319.html

    Zookeeper是一种提供“分布式服务协调“的中心化服务,分布式应用程序才可以基于Zookeeper的以下两个特性实现分布式锁功能。

    • 顺序临时节点:Zookeeper提供一个多层级的节点命名空间(节点称为Znode),每个节点都用一个以斜杠(/)分隔的路径来表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),每个节点还能被标记为有序性(SEQUENTIAL),一旦节点被标记为有序性,那么整个节点就具有顺序自增的特点。一般可以组合这几类节点来创建所需要的节点,例如,创建一个持久节点作为父节点,在父节点下面创建临时节点,并标记该临时节点为有序性。
    • Watch机制:Zookeeper还提供了另外一个重要的特性,Watcher(事件监听器)。ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知给用户。

    熟悉了Zookeeper的这两个特性之后,就可以看看Zookeeper是如何实现分布式锁的了。

    首先,需要建立一个父节点,节点类型为持久节点(PERSISTENT) ,每当需要访问共享资源时,就会在父节点下建立相应的顺序子节点,节点类型为临时节点(EPHEMERAL),且标记为有序性(SEQUENTIAL),并且以临时节点名称+父节点名称+顺序号组成特定的名字。

    在建立子节点后,对父节点下面的所有以临时节点名称name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,如果是最小节点,则获得锁。

    如果不是最小节点,则阻塞等待锁,并且获得该节点的上一顺序节点,为其注册监听事件,等待节点对应的操作获得锁。

    当调用完共享资源后,删除该节点,关闭zk,进而可以触发监听事件,释放该锁。

    以上实现的分布式锁是严格按照顺序访问的并发锁。一般还可以直接引用Curator框架来实现Zookeeper分布式锁,代码如下:

    Maven Dependency

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.2.0</version>
    </dependency>

    DistributedLock.java

     1 package org.fool.spring.util;
     2 
     3 import java.util.concurrent.TimeUnit;
     4 
     5 public interface DistributedLock {
     6 
     7     void lock() throws Exception;
     8 
     9     boolean tryLock(long time, TimeUnit unit) throws Exception;
    10 
    11     void unlock() throws Exception;
    12 
    13     boolean isAcquiredInThisProcess();
    14 }

    ZkDistributedLock.java

     1 package org.fool.spring.util;
     2 
     3 import org.apache.curator.framework.CuratorFramework;
     4 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
     5 
     6 import java.util.concurrent.TimeUnit;
     7 
     8 public class ZkDistributedLock implements DistributedLock {
     9 
    10     private InterProcessMutex mutex;
    11 
    12     ZkDistributedLock(ZkClient zkClient, String lockPath) {
    13         CuratorFramework client = zkClient.getClient();
    14         this.mutex = new InterProcessMutex(client, lockPath);
    15     }
    16 
    17     @Override
    18     public void lock() throws Exception {
    19         this.mutex.acquire();
    20     }
    21 
    22     @Override
    23     public boolean tryLock(long time, TimeUnit unit) throws Exception {
    24         return this.mutex.acquire(time, unit);
    25     }
    26 
    27     @Override
    28     public void unlock() throws Exception {
    29         this.mutex.release();
    30     }
    31 
    32     @Override
    33     public boolean isAcquiredInThisProcess() {
    34         return this.mutex.isAcquiredInThisProcess();
    35     }
    36 
    37 }

    ZkClient.java

     1 package org.fool.spring.util;
     2 
     3 import org.apache.curator.framework.CuratorFramework;
     4 
     5 public class ZkClient {
     6 
     7     private final CuratorFramework client;
     8 
     9     ZkClient(CuratorFramework client) {
    10         this.client = client;
    11     }
    12 
    13     CuratorFramework getClient() {
    14         return this.client;
    15     }
    16 
    17     /**
    18      * start the client
    19      */
    20     public void start() {
    21         this.client.start();
    22     }
    23 
    24     /**
    25      * close the client
    26      */
    27     public void close() {
    28         this.client.close();
    29     }
    30 
    31 }

    DistributedLocks.java

     1 package org.fool.spring.util;
     2 
     3 import org.apache.curator.RetryPolicy;
     4 import org.apache.curator.framework.CuratorFramework;
     5 import org.apache.curator.framework.CuratorFrameworkFactory;
     6 import org.apache.curator.retry.ExponentialBackoffRetry;
     7 
     8 public final class DistributedLocks {
     9 
    10     private DistributedLocks() {
    11     }
    12 
    13     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
    14     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
    15     private static final int BASE_SLEEP_TIME_MS = 1000;
    16     private static final int MAX_RETRIES = 3;
    17 
    18     /**
    19      * Define the default retry policy
    20      */
    21     private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES);
    22 
    23     /**
    24      * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout.
    25      */
    26     public static ZkClient newZkClient(String connectString) {
    27         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS,
    28                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY);
    29         return new ZkClient(client);
    30     }
    31 
    32     /**
    33      * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout
    34      */
    35     public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) {
    36         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs,
    37                 connectionTimeoutMs, DEFAULT_RETRY_POLICY);
    38         return new ZkClient(client);
    39     }
    40 
    41     /**
    42      * Create a new DistributedLock with ZkClient and lock path.
    43      */
    44     public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) {
    45         return new ZkDistributedLock(zkClient, lockPath);
    46     }
    47 }

    TestZkDistributedLock.java

     1 package org.fool.spring.test;
     2 
     3 import org.fool.spring.util.DistributedLock;
     4 import org.fool.spring.util.DistributedLocks;
     5 import org.fool.spring.util.ZkClient;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 
     9 import java.util.concurrent.TimeUnit;
    10 
    11 public class TestZkDistributedLock {
    12 
    13     private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class);
    14 
    15     private static final String lockPath = "/curator/test";
    16 
    17     public static void main(String[] args) throws Exception {
    18         ZkClient zkClient = DistributedLocks.newZkClient("127.0.0.1:2181");
    19         zkClient.start();
    20 
    21         DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath);
    22 
    23         boolean isAcquired = lock.isAcquiredInThisProcess();
    24         logger.info("==========lock acquired: " + isAcquired + "==========");
    25 
    26         if (lock.tryLock(3, TimeUnit.SECONDS)) {
    27             try {
    28                 isAcquired = lock.isAcquiredInThisProcess();
    29                 logger.info("==========lock acquired: " + isAcquired + "==========");
    30 
    31                 // mock to do business logic
    32                 Thread.sleep(60000);
    33             } finally {
    34                 lock.unlock();
    35                 logger.info("==========release the lock !!!==========");
    36             }
    37         } else {
    38             logger.info("==========failed to get the lock !!!==========");
    39         }
    40 
    41         zkClient.close();
    42     }
    43 }

    Test

    执行TestZkDistributedLock,模拟业务执行占用60s时间

    在60s内,再次执行TestZkDistributedLock,可以看到尝试获取锁失败

    打开zk client,查看执行期间内的顺序临时节点的变化情况

     

    Summary

    Zookeeper实现的分布式锁

    优点

    • Zookeeper是集群实现,可以避免单点问题,且能保证每次操作都可以有效地释放锁,这是因为一旦应用服务挂掉了,临时节点会因为session连接断开而自动删除掉。

    缺点

    • 由于频繁地创建和删除结点,加上大量的Watch事件,对Zookeeper集群来说,压力非常大。且从性能上来说,与Redis实现的分布式锁相比,还是存在一定的差距。

    Reference

    https://time.geekbang.org/column/article/125983

    http://curator.apache.org/

  • 相关阅读:
    线程_Process实例
    线程_multiprocessing异步
    线程_multiprocessing实现文件夹copy器
    线程_GIL最简单的例子
    线程_FIFO队列实现生产者消费者
    线程_apply堵塞式
    正则表达式_合集下(后续还会有补充)
    正则表达式_合集上
    二分法查找
    数据结构_二叉树
  • 原文地址:https://www.cnblogs.com/agilestyle/p/11605319.html
Copyright © 2011-2022 走看看