zoukankan      html  css  js  c++  java
  • 基于Zookeeper实现分布式锁

    为什么需要分布式锁

      锁是多线程代码中的概念,只有当多任务访问同一个互斥共享资源时才需要。如下图:

            

      在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下。但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,JVM之间已经无法通过多线程的锁解决同步问题。那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。

      如下例:携程、美团、飞猪、去哪儿四个购票网站实际上都没有最终售票权,只有12306铁道总局有火车票,那么四个购票网站都需要购买火车票,那么四个网站必须排队进行同步,否则不同步会造成多售(类似同一进程中多线程间不同步也会造成多售)。这时他们就需要有一个公共的锁管理方案,来保证APP间的购票是同步的。要想购票:

      1、首先服务获取分布式

      2、服务获取分布式锁后,才能去调用12306进行购票。

      3、购票成功后,释放分布式锁,这样其余APP才能获取锁并进行12306购票。

               

    为什么要使用zookeeper

      1、基于mysql实现分布式锁

      基于分布式锁的实现,首先肯定是想单独分离出一台mysql数据库,所有服务要想操作文件(共享资源),那么必须先在mysql数据库中插入一个标志,插入标志的服务就持有了锁,并对文件进行操作,操作完成后,主动删除标志进行锁释放,其与服务会一直查询数据库,看是否标志有被占用,直到没有标志占用时自己才能写入标志获取锁。

      但是这样有这么一个问题,如果服务(jvm1)宕机或者卡顿了,会一直持有锁未释放,这样就造成了死锁,因此就需要有一个监视锁进程时刻监视锁的状态,如果超过一定时间未释放就要进行主动清理锁标记,然后供其与服务继续获取锁。

      如果监视锁字段进程和jvm1同时挂掉,依旧不能解决死锁问题,于是又增加一个监视锁字段进程,这样一个进程挂掉,还有另一个监视锁字段进程可以对锁进行管理。这样又诞生一个新的问题,两个监视进程必须进行同步,否则对于过期的情况管理存在不一致问题。

      因此存在以下问题,并且方案变得很复杂:

      1、监视锁字段进程对于锁的监视时间周期过短,仍旧会造成多售(jvm1还没处理完其持有的锁就被主动销毁,造成多个服务同时持有锁进行操作)。

      2、监视锁字段进程对于锁的监视时间周期过长,会造成整个服务卡顿过长,吞吐低下。

      3、监视锁字段进程间的同步问题。

      4、当一个jvm持有锁的时候,其余服务会一直访问数据库查看锁,会造成其余jvm的资源浪费。

             

      2、基于Redis实现分布式锁

      相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点,Redis就是其中一种。由于Redis可以设置字段的有效期,因此可以实现自动释放超期的锁,不需要多个监视锁字段进程进行锁守护,可以依旧存在上述mysql实现中除了3以外1、2、4中的问题。  

               

      3、基于Zookeeper实现分布式锁

      基于以上两种实现方式,有了基于zookeeper实现分布式锁的方案。由于zookeeper有以下特点:

      1️⃣维护了一个有层次的数据节点,类似文件系统。

      2️⃣有以下数据节点:临时节点、持久节点、临时有序节点(分布式锁实现基于的数据节点)、持久有序节点。

      3️⃣zookeeper可以和client客户端通过心跳的机制保持长连接,如果客户端链接zookeeper创建了一个临时节点,那么这个客户端与zookeeper断开连接后会自动删除。

      4️⃣zookeeper的节点上可以注册上用户事件(自定义),如果节点数据删除等事件都可以触发自定义事件。

      5️⃣zookeeper保持了统一视图,各服务对于状态信息获取满足一致性。

      Zookeeper的每一个节点,都是一个天然的顺序发号器。

      在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一

      比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。
              

    如何使用zookeeper实现分布式锁

      大致思想为:每个客户端对某个方法加锁时,在 Zookeeper 上与该方法对应的指定节点的目录下,生成一个唯一的临时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个临时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。

      1、排它锁

      排他锁,又称写锁或独占锁。如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取或更新操作,其他任务事务都不能对这个数据对象进行任何操作,直到T1释放了排他锁。

      排他锁核心是保证当前有且仅有一个事务获得锁,并且锁释放之后,所有正在等待获取锁的事务都能够被通知到。

      Zookeeper 的强一致性特性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性,即Zookeeper将会保证客户端无法重复创建一个已经存在的数据节点。可以利用Zookeeper这个特性,实现排他锁。

      1️⃣定义锁:通过Zookeeper上的数据节点来表示一个锁
      2️⃣获取锁:客户端通过调用 create 方法创建表示锁的临时节点,可以认为创建成功的客户端获得了锁,同时可以让没有获得锁的节点在该节点上注册Watcher监听,以便实时监听到lock节点的变更情况
      3️⃣释放锁:以下两种情况都可以让锁释放
        当前获得锁的客户端发生宕机或异常,那么Zookeeper上这个临时节点就会被删除
        正常执行完业务逻辑,客户端主动删除自己创建的临时节点
      基于Zookeeper实现排他锁流程:

              

      2、共享锁

      共享锁,又称读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。

      共享锁与排他锁的区别在于,加了排他锁之后,数据对象只对当前事务可见,而加了共享锁之后,数据对象对所有事务都可见。

      1️⃣定义锁:通过Zookeeper上的数据节点来表示一个锁,是一个类似于 /lockpath/[hostname]-请求类型-序号 的临时顺序节点
      2️⃣获取锁:客户端通过调用 create 方法创建表示锁的临时顺序节点,如果是读请求,则创建 /lockpath/[hostname]-R-序号 节点,如果是写请求则创建 /lockpath/[hostname]-W-序号节点
      3️⃣判断读写顺序:大概分为4个步骤
      1)创建完节点后,获取 /lockpath 节点下的所有子节点,并对该节点注册子节点变更的Watcher监听
      2)确定自己的节点序号在所有子节点中的顺序
        3.1)对于读请求:1. 如果没有比自己序号更小的子节点,或者比自己序号小的子节点都是读请求,那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑 2. 如果有比自己序号小的子节点有写请求,那么等待
        3.2)对于写请求,如果自己不是序号最小的节点,那么等待
      4)接收到Watcher通知后,重复步骤1)
      4️⃣释放锁:与排他锁逻辑一致

                 

       基于Zookeeper实现共享锁流程:

                

      3、羊群效应

      在实现共享锁的 "判断读写顺序" 的第1个步骤是:创建完节点后,获取 /lockpath 节点下的所有子节点,并对该节点注册子节点变更的Watcher监听。这样的话,任何一次客户端移除共享锁之后,Zookeeper将会发送子节点变更的Watcher通知给所有机器,系统中将有大量的 "Watcher通知" 和 "子节点列表获取" 这个操作重复执行,然后所有节点再判断自己是否是序号最小的节点(写请求)或者判断比自己序号小的子节点是否都是读请求(读请求),从而继续等待下一次通知。

      然而,这些重复操作很多都是 "无用的",实际上每个锁竞争者只需要关注序号比自己小的那个节点是否存在即可。

      当集群规模比较大时,这些 "无用的" 操作不仅会对Zookeeper造成巨大的性能影响和网络冲击,更为严重的是,如果同一时间有多个客户端释放了共享锁,Zookeeper服务器就会在短时间内向其余客户端发送大量的事件通知--这就是所谓的 "羊群效应"。

      改进后的分布式锁实现:

      1️⃣客户端调用 create 方法创建一个类似于 /lockpath/[hostname]-请求类型-序号 的临时顺序节点。

      2️⃣客户端调用 getChildren 方法获取所有已经创建的子节点列表(这里不注册任何Watcher)。

      3️⃣如果无法获取任何共享锁,那么调用 exist 来对比自己小的那个节点注册Watcher
        读请求:向比自己序号小的最后一个写请求节点注册Watcher监听
        写请求:向比自己序号小的最后一个节点注册Watcher监听
      4️⃣等待Watcher监听,继续进入步骤2️⃣
      Zookeeper羊群效应改进前后Watcher监听图:

                

    zookeeper分布式锁示例代码

      分布式锁实现类(锁初始化、创建、获取、等待、释放):

    /**
     * 基于zookeeper的分布式锁
     */
    public class DistributedLock implements Lock, Watcher {
        private ZooKeeper zk = null;
        // 根节点
        private String ROOT_LOCK = "/lock_msb";
        // 竞争的资源
        private String lockName;
        // 等待的前一个锁
        private String WAIT_LOCK;
        // 当前锁
        private String CURRENT_LOCK;
        // 计数器
        private CountDownLatch countDownLatch;
        private int sessionTimeout = 3000000;
        private List<Exception> exceptionList = new ArrayList<Exception>();
    
        /**
         * 配置分布式锁
         * @param config 连接的url
         * @param lockName 竞争资源
         */
        public DistributedLock(String config, String lockName) {
            this.lockName = lockName;
            try {
                // 连接zookeeper
                zk = new ZooKeeper(config, sessionTimeout, this);
                Stat stat = zk.exists(ROOT_LOCK, false);
                if (stat == null) {
                    // 如果根节点不存在,则创建根节点
                    zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        // 节点监视器
        public void process(WatchedEvent event) {
            if (this.countDownLatch != null) {
                this.countDownLatch.countDown();
            }
        }
    
        public void lock() {
            if (exceptionList.size() > 0) {
                throw new LockException(exceptionList.get(0));
            }
            try {
                if (this.tryLock()) {
                    System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
                    return;
                } else {
                    // 等待锁
                    waitForLock(WAIT_LOCK, sessionTimeout);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        public boolean tryLock() {
            try {
                String splitStr = "_lock_";
                if (lockName.contains(splitStr)) {
                    throw new LockException("锁名有误");
                }
                // 创建临时有序节点
                CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(CURRENT_LOCK + " 已经创建");
                // 取所有子节点
                List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
                // 取出所有lockName的锁
                List<String> lockObjects = new ArrayList<String>();
                for (String node : subNodes) {
                    String _node = node.split(splitStr)[0];
                    if (_node.equals(lockName)) {
                        lockObjects.add(node);
                    }
                }
                Collections.sort(lockObjects);
                System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
                // 若当前节点为最小节点,则获取锁成功
                if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
                    return true;
                }
    
                // 若不是最小节点,则找到自己的前一个节点
                String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
                WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                if (this.tryLock()) {
                    return true;
                }
                return waitForLock(WAIT_LOCK, timeout);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        // 等待锁
        private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
            Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
    
            if (stat != null) {
                System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev);
                this.countDownLatch = new CountDownLatch(1);
                // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
                this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
                this.countDownLatch = null;
                System.out.println(Thread.currentThread().getName() + " 等到了锁");
            }
            return true;
        }
    
        public void unlock() {
            try {
                System.out.println("释放锁 " + CURRENT_LOCK);
                zk.delete(CURRENT_LOCK, -1);
                CURRENT_LOCK = null;
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        public Condition newCondition() {
            return null;
        }
    
        public void lockInterruptibly() throws InterruptedException {
            this.lock();
        }
    
    
        public class LockException extends RuntimeException {
            private static final long serialVersionUID = 1L;
            public LockException(String e){
                super(e);
            }
            public LockException(Exception e){
                super(e);
            }
        }
    }

      测试类:

    public class Test {
        //100张票
        private Integer n = 100;
    //    private Lock lock = new ReentrantLock();
    
        public void printInfo() {
            System.out.println(Thread.currentThread().getName() +
                    "正在运行,剩余余票:" + --n);
        }
    
        public class TicketThread implements Runnable {
            public void run() {
                Lock lock = new DistributedLock("192.168.150.111:2181,192.168.150.112:2181,192.168.150.113:2181", "zk");
                lock.lock();
                try {
                    if (n > 0) {
                        printInfo();
                    }
                    }finally{
                        lock.unlock();
                    }
                }
        }
    
        public void ticketStart() {
            TicketThread thread = new TicketThread();
            for (int i = 0; i < 30; i++) {
                Thread t = new Thread(thread, "mem" + i);
                t.start();
            }
        }
    
        public static void main(String[] args) {
            new Test().ticketStart();
        }
    }

    基于Curator客户端实现分布式锁

      Apache Curator是一个Zookeeper的开源客户端,它提供了Zookeeper各种应用场景(Recipe,如共享锁服务、master选举、分布式计数器等)的抽象封装,接下来将利用Curator提供的类来实现分布式锁。

      Curator提供的跟分布式锁相关的类有5个,分别是:

    Shared Reentrant Lock 可重入锁
    Shared Lock 共享不可重入锁
    Shared Reentrant Read Write Lock 可重入读写锁
    Shared Semaphore 信号量
    Multi Shared Lock 多锁

      关于错误处理:还是强烈推荐使用ConnectionStateListener处理连接状态的改变。当连接LOST时你不再拥有锁。

      可重入锁

      Shared Reentrant Lock,全局可重入锁,所有客户端都可以请求,同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类 InterProcessMutex 来实现,它的主要方法:

    // 构造方法
    public InterProcessMutex(CuratorFramework client, String path)
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
    // 通过acquire获得锁,并提供超时机制:
    public void acquire() throws Exception
    public boolean acquire(long time, TimeUnit unit) throws Exception
    // 撤销锁
    public void makeRevocable(RevocationListener<InterProcessMutex> listener)
    public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)

      定义一个 FakeLimitedResource 类来模拟一个共享资源,该资源一次只能被一个线程使用,直到使用结束,下一个线程才能使用,否则会抛出异常。

    public class FakeLimitedResource {
        private final AtomicBoolean inUse = new AtomicBoolean(false);
    
        // 模拟只能单线程操作的资源
        public void use() throws InterruptedException {
            if (!inUse.compareAndSet(false, true)) {
                // 在正确使用锁的情况下,此异常不可能抛出
                throw new IllegalStateException("Needs to be used by one client at a time");
            }
            try {
                Thread.sleep((long) (100 * Math.random()));
            } finally {
                inUse.set(false);
            }
        }
    }

      下面的代码将创建 N 个线程来模拟分布式系统中的节点,系统将通过 InterProcessMutex 来控制对资源的同步使用;每个节点都将发起10次请求,完成 请求锁--访问资源--再次请求锁--释放锁--释放锁 的过程;客户端通过 acquire 请求锁,通过 release 释放锁,获得几把锁就要释放几把锁;这个共享资源一次只能被一个线程使用,如果控制同步失败,将抛异常。

    public class SharedReentrantLockTest {
        private static final String lockPath = "/testZK/sharedreentrantlock";
        private static final Integer clientNums = 5;
        final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的资源
        private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < clientNums; i++) {
                String clientName = "client#" + i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        CuratorFramework client = ZKUtils.getClient();
                        client.start();
                        Random random = new Random();
                        try {
                            final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                            // 每个客户端请求10次共享资源
                            for (int j = 0; j < 10; j++) {
                                if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                    throw new IllegalStateException(j + ". " + clientName + " 不能得到互斥锁");
                                }
                                try {
                                    System.out.println(j + ". " + clientName + " 已获取到互斥锁");
                                    resource.use(); // 使用资源
                                    if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                        throw new IllegalStateException(j + ". " + clientName + " 不能再次得到互斥锁");
                                    }
                                    System.out.println(j + ". " + clientName + " 已再次获取到互斥锁");
                                    lock.release(); // 申请几次锁就要释放几次锁
                                } finally {
                                    System.out.println(j + ". " + clientName + " 释放互斥锁");
                                    lock.release(); // 总是在finally中释放
                                }
                                Thread.sleep(random.nextInt(100));
                            }
                        } catch (Throwable e) {
                            System.out.println(e.getMessage());
                        } finally {
                            CloseableUtils.closeQuietly(client);
                            System.out.println(clientName + " 客户端关闭!");
                            countDownLatch.countDown();
                        }
                    }
                }).start();
            }
            countDownLatch.await();
            System.out.println("结束!");
        }
    }

      控制台打印日志,可以看到对资源的同步访问控制成功,并且锁是可重入的:

    0. client#3 已获取到互斥锁
    0. client#3 已再次获取到互斥锁
    0. client#3 释放互斥锁
    ... ...
    0. client#4 已获取到互斥锁
    0. client#4 已再次获取到互斥锁
    0. client#4 释放互斥锁
    1. client#1 已获取到互斥锁
    1. client#1 已再次获取到互斥锁
    1. client#1 释放互斥锁
    2. client#1 已获取到互斥锁
    2. client#1 已再次获取到互斥锁
    2. client#1 释放互斥锁
    1. client#4 已获取到互斥锁
    1. client#4 已再次获取到互斥锁
    1. client#4 释放互斥锁
    ... ...
    client#0 客户端关闭!
    8. client#4 已获取到互斥锁
    8. client#4 已再次获取到互斥锁
    8. client#4 释放互斥锁
    9. client#4 已获取到互斥锁
    9. client#4 已再次获取到互斥锁
    9. client#4 释放互斥锁
    client#3 客户端关闭!
    client#4 客户端关闭!
    结束!

      同时在程序运行期间查看Zookeeper节点树,可以发现每一次请求的锁实际上对应一个临时顺序节点。

    [zk: localhost:2181(CONNECTED) 42] ls /testZK/sharedreentrantlock
    [leases, _c_208d461b-716d-43ea-ac94-1d2be1206db3-lock-0000001659, locks, 
      _c_64b19dba-3efa-46a6-9344-19a52e9e424f-lock-0000001658, _c_cee02916-d7d5-4186-8867-f921210b8815-lock-0000001657]

      不可重入锁

      Shared Lock 与 Shared Reentrant Lock 相似,但是不可重入。这个不可重入锁由类 InterProcessSemaphoreMutex 来实现,使用方法和上面的类类似。

      将上面程序中的 InterProcessMutex 换成不可重入锁 InterProcessSemaphoreMutex,如果再运行上面的代码,结果就会发现线程被阻塞在第二个 acquire 上,直到超时,也就是此锁不是可重入的。

      控制台输出日志:

    0. client#2 已获取到互斥锁
    0. client#1 不能得到互斥锁
    0. client#4 不能得到互斥锁
    0. client#0 不能得到互斥锁
    0. client#3 不能得到互斥锁
    client#1 客户端关闭!
    client#4 客户端关闭!
    client#3 客户端关闭!
    client#0 客户端关闭!
    0. client#2 释放互斥锁
    0. client#2 不能再次得到互斥锁
    client#2 客户端关闭!
    结束!

      把第二个获取锁的代码注释,程序才能正常执行:

    0. client#1 已获取到互斥锁
    0. client#1 释放互斥锁
    0. client#2 已获取到互斥锁
    0. client#2 释放互斥锁
    0. client#0 已获取到互斥锁
    0. client#0 释放互斥锁
    0. client#4 已获取到互斥锁
    0. client#4 释放互斥锁
    0. client#3 已获取到互斥锁
    0. client#3 释放互斥锁
    1. client#1 已获取到互斥锁
    1. client#1 释放互斥锁
    1. client#2 已获取到互斥锁
    1. client#2 释放互斥锁
    ....
    ....
    9. client#4 已获取到互斥锁
    9. client#4 释放互斥锁
    9. client#0 已获取到互斥锁
    client#2 客户端关闭!
    9. client#0 释放互斥锁
    9. client#1 已获取到互斥锁
    client#0 客户端关闭!
    client#4 客户端关闭!
    9. client#1 释放互斥锁
    9. client#3 已获取到互斥锁
    client#1 客户端关闭!
    9. client#3 释放互斥锁
    client#3 客户端关闭!
    结束!

      可重入读写锁

      Shared Reentrant Read Write Lock,可重入读写锁,一个读写锁管理一对相关的锁,一个负责读操作,另外一个负责写操作;读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞);此锁是可重入的;一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁,这也意味着写锁可以降级成读锁, 比如 请求写锁 --->读锁 ---->释放写锁;从读锁升级成写锁是不行的。

      可重入读写锁主要由两个类实现:InterProcessReadWriteLock、InterProcessMutex,使用时首先创建一个 InterProcessReadWriteLock 实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是 InterProcessMutex。

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < clientNums; i++) {
            final String clientName = "client#" + i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework client = ZKUtils.getClient();
                    client.start();
                    final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
                    final InterProcessMutex readLock = lock.readLock();
                    final InterProcessMutex writeLock = lock.writeLock();
    
                    try {
                        // 注意只能先得到写锁再得到读锁,不能反过来!!!
                        if (!writeLock.acquire(10, TimeUnit.SECONDS)) {
                            throw new IllegalStateException(clientName + " 不能得到写锁");
                        }
                        System.out.println(clientName + " 已得到写锁");
                        if (!readLock.acquire(10, TimeUnit.SECONDS)) {
                            throw new IllegalStateException(clientName + " 不能得到读锁");
                        }
                        System.out.println(clientName + " 已得到读锁");
                        try {
                            resource.use(); // 使用资源
                        } finally {
                            System.out.println(clientName + " 释放读写锁");
                            readLock.release();
                            writeLock.release();
                        }
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                    } finally {
                        CloseableUtils.closeQuietly(client);
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        System.out.println("结束!");
    }

      控制台打印日志:

    client#1 已得到写锁
    client#1 已得到读锁
    client#1 释放读写锁
    client#2 已得到写锁
    client#2 已得到读锁
    client#2 释放读写锁
    client#0 已得到写锁
    client#0 已得到读锁
    client#0 释放读写锁
    client#4 已得到写锁
    client#4 已得到读锁
    client#4 释放读写锁
    client#3 已得到写锁
    client#3 已得到读锁
    client#3 释放读写锁
    结束!

      信号量

      Shared Semaphore,一个计数的信号量类似JDK的 Semaphore,JDK中 Semaphore 维护的一组许可(permits),而Cubator中称之为租约(Lease)。有两种方式可以决定 semaphore 的最大租约数,第一种方式是由用户给定的 path 决定,第二种方式使用 SharedCountReader 类。如果不使用 SharedCountReader,没有内部代码检查进程是否假定有10个租约而进程B假定有20个租约。 所以所有的实例必须使用相同的 numberOfLeases 值.

      信号量主要实现类有:

    nterProcessSemaphoreV2 - 信号量实现类
    Lease - 租约(单个信号)
    SharedCountReader - 计数器,用于计算最大租约数量

      调用 acquire 会返回一个租约对象,客户端必须在 finally 中 close 这些租约对象,否则这些租约会丢失掉。但是,如果客户端session由于某种原因比如crash丢掉,那么这些客户端持有的租约会自动close,这样其它客户端可以继续使用这些租约。租约还可以通过下面的方式返还:

    public void returnLease(Lease lease)
    public void returnAll(Collection<Lease> leases) 

      注意一次你可以请求多个租约,如果 Semaphore 当前的租约不够,则请求线程会被阻塞。同时还提供了超时的重载方法。

    public Lease acquire() throws Exception
    public Collection<Lease> acquire(int qty) throws Exception
    public Lease acquire(long time, TimeUnit unit) throws Exception
    public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception

      一个Demo程序如下:

    public class SharedSemaphoreTest {
        private static final int MAX_LEASE = 10;
        private static final String PATH = "/testZK/semaphore";
        private static final FakeLimitedResource resource = new FakeLimitedResource();
    
        public static void main(String[] args) throws Exception {
            CuratorFramework client = ZKUtils.getClient();
            client.start();
            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("获取租约数量:" + leases.size());
            Lease lease = semaphore.acquire();
            System.out.println("获取单个租约");
            resource.use(); // 使用资源
            // 再次申请获取5个leases,此时leases数量只剩4个,不够,将超时
            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("获取租约,如果超时将为null: " + leases2);
            System.out.println("释放租约");
            semaphore.returnLease(lease);
            // 再次申请获取5个,这次刚好够
            leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("获取租约,如果超时将为null: " + leases2);
            System.out.println("释放集合中的所有租约");
            semaphore.returnAll(leases);
            semaphore.returnAll(leases2);
            client.close();
            System.out.println("结束!");
        }
    }

      控制台打印日志:

    获取租约数量:5
    获取单个租约
    获取租约,如果超时将为null: null
    释放租约
    获取租约,如果超时将为null: [org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@3108bc, 
      org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@370736d9,
      org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@5f9d02cb,
      org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@63753b6d,
      org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@6b09bb57] 释放集合中的所有租约 结束!

      注意:上面所讲的4种锁都是公平锁(fair)。从ZooKeeper的角度看,每个客户端都按照请求的顺序获得锁,相当公平。

    多锁

      Multi Shared Lock 是一个锁的容器。当调用 acquire,所有的锁都会被 acquire,如果请求失败,所有的锁都会被 release。同样调用 release 时所有的锁都被 release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。

      主要涉及两个类:

    InterProcessMultiLock - 对所对象实现类
    InterProcessLock - 分布式锁接口类

      它的构造函数需要包含的锁的集合,或者一组 ZooKeeper 的 path,用法和 Shared Lock 相同。

    public InterProcessMultiLock(CuratorFramework client, List<String> paths)
    public InterProcessMultiLock(List<InterProcessLock> locks)

      一个Demo程序如下:

    public class MultiSharedLockTest {
        private static final String lockPath1 = "/testZK/MSLock1";
        private static final String lockPath2 = "/testZK/MSLock2";
        private static final FakeLimitedResource resource = new FakeLimitedResource();
    
        public static void main(String[] args) throws Exception {
            CuratorFramework client = ZKUtils.getClient();
            client.start();
    
            InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); // 可重入锁
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2); // 不可重入锁
            // 组锁,多锁
            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("不能获取多锁");
            }
            System.out.println("已获取多锁");
            System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
            System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
            try {
                resource.use(); // 资源操作
            } finally {
                System.out.println("释放多个锁");
                lock.release(); // 释放多锁
            }
            System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
            System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
            client.close();
            System.out.println("结束!");
        }
    }
  • 相关阅读:
    增加正则项Regularization to Prevent Overfitting
    feature_column、fc.input_layer以及各种类型的column如何转化
    input_fn如何读取数据
    matplotlib.pyplot如何绘制多张子图
    机器学习之杂乱笔记
    Adobe Flash Player
    LSTM/GRU-讲得非常细致
    anaconda python36 tensorflow virtualenv
    畅通工程-HZNU寒假集训
    食物链-HZUN寒假集训
  • 原文地址:https://www.cnblogs.com/jing99/p/11607094.html
Copyright © 2011-2022 走看看