zoukankan      html  css  js  c++  java
  • zookeeper实现分布式锁总结,看这一篇足矣(设计模式应用实战)

    zk实现分布式锁纵观网络各种各样的帖子层出不穷,笔者查阅很多资料发现一个问题,有些文章只写原理并没有具体实现,有些文章虽然写了实现但是并不全面

    借这个周末给大家做一个总结,代码拿来就可以用并且每一种实现都经过了测试没有bug。下面我们先从最简单的实现开始介绍:

    • 简单的实现
    package com.srr.lock;
    
    /**
     * @Description 分布式锁的接口
     */
    abstract  public interface DistributedLock {
        /**
         * 获取锁
         */
        boolean lock();
        /**
         * 解锁
         */
        void unlock();
    
        abstract boolean readLock();
        abstract boolean writeLock();
    }
    
    package com.srr.lock;
    
    /**
     * 简单的zk分布式做实现策略
     * 性能比较低会导致羊群效应
     */
    public abstract class SimplerZKLockStrategy implements DistributedLock{
        /**
         * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
         * @throws Exception
         */
        @Override
        public boolean lock() {
            //获取锁成功
            if (tryLock()){
                System.out.println(Thread.currentThread().getName()+"获取锁成功");
                return true;
            }else{  //获取锁失败
                //阻塞一直等待
                waitLock();
                //递归,再次获取锁
                return lock();
            }
        }
    
        /**
         * 尝试获取锁,子类实现
         */
        protected abstract boolean tryLock() ;
        /**
         * 等待获取锁,子类实现
         */
        protected abstract void waitLock();
        /**
         * 解锁:删除key
         */
        @Override
        public  abstract void unlock();
    }
    
    package com.srr.lock;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 分布式锁简单实现
     */
    public class SimpleZKLock extends SimplerZKLockStrategy{
        private static final String PATH = "/lowPerformance_zklock";
        private CountDownLatch countDownLatch = null;
        //zk地址和端口
        public static final String ZK_ADDR = "192.168.32.129:2181";
        //创建zk
        protected ZkClient zkClient = new ZkClient(ZK_ADDR);
    
        @Override
        protected boolean tryLock() {
            //如果不存在这个节点,则创建持久节点
            try{
                zkClient.createEphemeral(PATH, "lock");
                return true;
            }catch (Exception e){
                return false;
            }
        }
    
        @Override
        protected void waitLock() {
            IZkDataListener lIZkDataListener = new IZkDataListener() {
    
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (null != countDownLatch){
                        countDownLatch.countDown();
                    }
                    System.out.println("listen lock unlock");
                }
    
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
    
                }
            };
            //监听前一个节点的变化
            zkClient.subscribeDataChanges(PATH, lIZkDataListener);
            if (zkClient.exists(PATH)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(PATH, lIZkDataListener);
    
        }
    
        @Override
        public void unlock() {
            if (null != zkClient) {
                System.out.println("lock unclock");
                zkClient.delete(PATH);
            }
        }
    
        @Override
        public boolean readLock() {
            return true;
        }
    
        @Override
        public boolean writeLock() {
            return true;
        }
    }
    
    package com.srr.lock;
    
    import redis.clients.jedis.Jedis;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     *  测试场景
     *  count从1加到4
     *  使用简单的分布式锁在分布式环境下保证结果正确
     */
    public class T {
    
        volatile int  count = 1;
    
        public void inc(){
            for(int i = 0;i<3;i++){
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                count++;
                System.out.println("count == "+count);
            }
        }
    
        public int getCount(){
           return count;
        }
    
        public static void main(String[] args) throws InterruptedException {
            final T t = new T();
            final Lock lock = new Lock();
            final CountDownLatch countDownLatch = new CountDownLatch(5);
            for(int i = 0;i<5;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        DistributedLock distributedLock = new SimpleZKLock();
                        if(lock.lock(distributedLock)){
                            t.inc();
                            lock.unlock(distributedLock);
                            countDownLatch.countDown();
                        }
                        System.out.println("count == "+t.getCount());
                    }
                }).start();
            }
            countDownLatch.await();
        }
    }

    运行结果:

     这种方式实现虽然简单,但是会引发羊群效应,因为每个等待锁的客户端都需要注册监听lock节点的删除事件,如果客户端并发请求很多,那么这将会非常消耗zookeeper集群

    的资源,严重的化则会导致zookeeper集群宕机也不是没有可能。

    • 高性能实现,解决羊群效应问题
    package com.srr.lock;
    
    /**
     * @Description 分布式锁的接口
     */
    abstract  public interface DistributedLock {
        /**
         * 获取锁
         */
        boolean lock();
        /**
         * 解锁
         */
        void unlock();
    
        abstract boolean readLock();
        abstract boolean writeLock();
    }
    
    package com.srr.lock;
    
    public abstract class BlockingZKLockStrategy implements DistributedLock{
        /**
         * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
         * @throws Exception
         */
        @Override
        public final boolean lock() {
            //获取锁成功
            if (tryLock()){
                System.out.println(Thread.currentThread().getName()+"获取锁成功");
                return true;
            }else{  //获取锁失败
                //阻塞一直等待
                waitLock();
                //递归,再次获取锁
                return true;
            }
        }
    
        /**
         * 尝试获取锁,子类实现
         */
        protected abstract boolean tryLock() ;
        /**
         * 等待获取锁,子类实现
         */
        protected abstract void waitLock();
        /**
         * 解锁:删除key
         */
        @Override
        public  abstract void unlock();
    }
    
    package com.srr.lock;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class BlockingZKLock extends BlockingZKLockStrategy{
        private static final String PATH = "/highPerformance_zklock";
        //当前节点路径
        private String currentPath;
        //前一个节点的路径
        private String beforePath;
        private CountDownLatch countDownLatch = null;
        //zk地址和端口
        public static final String ZK_ADDR = "192.168.32.129:2181";
        //超时时间
        public static final int SESSION_TIMEOUT = 30000;
        //创建zk
        protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
    
        public BlockingZKLock() {
            //如果不存在这个节点,则创建持久节点
            if (!zkClient.exists(PATH)) {
                zkClient.createPersistent(PATH);
            }
        }
    
        @Override
        protected boolean tryLock() {
            //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
            //if (null == currentPath || "".equals(currentPath)) {
                //在path下创建一个临时的顺序节点
            currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
            //}
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //获取所有的临时节点,并排序
            List<String> childrens = zkClient.getChildren(PATH);
    
            Collections.sort(childrens);
    
            if (currentPath.equals(PATH+"/"+childrens.get(0))) {
                return true;
            }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
                int pathLength = PATH.length();
                int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
                beforePath = PATH+"/"+childrens.get(wz-1);
            }
            return false;
    
        }
    
        @Override
        protected void waitLock() {
            IZkDataListener lIZkDataListener = new IZkDataListener() {
    
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (null != countDownLatch){
                        countDownLatch.countDown();
                    }
                    System.out.println("listen lock unlock");
                }
    
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
    
                }
            };
            //监听前一个节点的变化
            zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
            if (zkClient.exists(beforePath)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
    
        }
    
        @Override
        public void unlock() {
            if (null != zkClient) {
                System.out.println("lock unclock");
                zkClient.delete(currentPath);
            }
        }
    
        @Override
        public boolean readLock() {
            return true;
        }
    
        @Override
        public boolean writeLock() {
            return true;
        }
    }
    
    package com.srr.lock;
    
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     *  测试场景
     *  count从1加到4
     *  使用高性能的分布式锁在分布式环境下保证结果正确
     */
    public class T {
    
        volatile int  count = 1;
    
        public void inc(){
            for(int i = 0;i<3;i++){
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                count++;
                System.out.println("count == "+count);
            }
        }
    
        public int getCount(){
           return count;
        }
    
        public static void main(String[] args) throws InterruptedException {
            final T t = new T();
            final Lock lock = new Lock();
            final CountDownLatch countDownLatch = new CountDownLatch(5);
            for(int i = 0;i<5;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        DistributedLock distributedLock = new BlockingZKLock();
                        if(lock.lock(distributedLock)){
                            t.inc();
                            lock.unlock(distributedLock);
                            countDownLatch.countDown();
                        }
                        System.out.println("count == "+t.getCount());
                    }
                }).start();
            }
            countDownLatch.await();
        }
    }

    这种实现客户端只需监听它前一个节点的变化,不需要监听所有的节点,从而提高了zookeeper锁的性能。

    • 共享锁(S锁)
    • 写到这个,看了网络上很多错误的文章实现把排它锁当做共享锁

    共享锁正确是实现姿势如下:

    package com.srr.lock;
    
    /**
     * @Description 分布式锁的接口
     */
    abstract  public interface DistributedLock {
        /**
         * 获取锁
         */
        boolean lock();
        /**
         * 解锁
         */
        void unlock();
    
        abstract boolean readLock();
        abstract boolean writeLock();
    }
    
    package com.srr.lock;
    
    /**
     * 共享锁策略
     */
    abstract public class ZKSharedLockStrategy implements DistributedLock{
        @Override
        public boolean readLock() {
            //获取锁成功
            if (tryReadLock()){
                System.out.println(Thread.currentThread().getName()+"获取读锁成功");
                return true;
            }else{  //获取锁失败
                //阻塞一直等待
                waitLock();
                //递归,再次获取锁
                return true;
            }
        }
    
        @Override
        public boolean writeLock() {
            //获取锁成功
            if (tryWriteLock()){
                System.out.println(Thread.currentThread().getName()+"获取写锁成功");
                return true;
            }else{  //获取锁失败
                //阻塞一直等待
                waitLock();
                //递归,再次获取锁
                return true;
            }
        }
    
        /**
         * 尝试获取锁,子类实现
         */
        protected abstract boolean tryWriteLock() ;
    
        /**
         * 尝试获取锁,子类实现
         */
        protected abstract boolean tryReadLock() ;
    
        /**
         * 等待获取锁,子类实现
         */
        protected abstract void waitLock();
    
        /**
         * 解锁:删除key
         */
        @Override
        public  abstract void unlock();
    }
    
    package com.srr.lock;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 共享锁
     */
    public class ZKSharedLock extends ZKSharedLockStrategy{
    
        private static final String PATH = "/zk-root-readwrite-lock";
        //当前节点路径
        private String currentPath;
        //前一个节点的路径
        private String beforePath;
        private CountDownLatch countDownLatch = null;
        //zk地址和端口
        public static final String ZK_ADDR = "192.168.32.129:2181";
        //超时时间
        public static final int SESSION_TIMEOUT = 30000;
        //创建zk
        protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
        public ZKSharedLock() {
            //如果不存在这个节点,则创建持久节点
            if (!zkClient.exists(PATH)) {
                zkClient.createPersistent(PATH);
            }
        }
    
        @Override
        protected boolean tryWriteLock() {
            //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
            if (null == currentPath || "".equals(currentPath)) {
            //在path下创建一个临时的顺序节点
            currentPath = zkClient.createEphemeralSequential(PATH+"/w", "writelock");
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //获取所有的临时节点,并排序
            List<String> childrens = zkClient.getChildren(PATH);
            Collections.sort(childrens);
    
            if (currentPath.equals(PATH+"/"+childrens.get(0))) {
                return true;
            }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
                int pathLength = PATH.length();
                int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
                beforePath = PATH+"/"+childrens.get(wz-1);
            }
            return false;
        }
    
        @Override
        protected boolean tryReadLock() {
            //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
            if (null == currentPath || "".equals(currentPath)) {
                //在path下创建一个临时的顺序节点
                currentPath = zkClient.createEphemeralSequential(PATH+"/r", "readklock");
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //获取所有的临时节点,并排序
            List<String> childrens = zkClient.getChildren(PATH);
            Collections.sort(childrens);
    
            if (currentPath.equals(PATH+"/"+childrens.get(0))) {
                return true;
            }else if(isAllReadNodes(childrens)){
                return true;
            }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
                int pathLength = PATH.length();
                int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
    
                for (int i = wz - 1; i > 0; i--) {
                    // 找到了离得最近的一个写节点,那么它的后一个节点要么是一个读节点,要么就是待加锁的节点本身
                    if (childrens.get(i).indexOf("w") >= 0) {
                        beforePath = PATH + "/" + childrens.get(i);
                        break;
                    }
                }
            }
            return false;
        }
    
        // 判断比自已小的节点是否都是读节点
        private boolean isAllReadNodes(List<String> sortNodes) {
            int pathLength = PATH.length();
            int currentIndex =  Collections.binarySearch(sortNodes, currentPath.substring(pathLength+1));
            for (int i = 0; i < currentIndex - 1; i++) {
                // 只要有一个写锁,则不能直接获取读锁
                if (sortNodes.get(i).indexOf("w") >= 0) {
                    return false;
                }
            }
    
            return true;
        }
    
        @Override
        protected void waitLock() {
            IZkDataListener lIZkDataListener = new IZkDataListener() {
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (null != countDownLatch){
                        countDownLatch.countDown();
                    }
                    System.out.println("listen lock unlock");
                }
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
    
                }
            };
            //监听前一个节点的变化
            zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
            if (zkClient.exists(beforePath)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
        }
    
        @Override
        public boolean lock() {
            return false;
        }
    
        @Override
        public void unlock() {
            if (null != zkClient) {
                System.out.println("lock unclock");
                zkClient.delete(currentPath);
                zkClient.close();
            }
        }
    }
    
    package com.srr.lock;
    
    /**
     * 锁工具类
     */
    public class Lock {
        /**
         * 获取锁
         */
        boolean lock(DistributedLock lock) {
            return lock.lock();
        };
    
        /**
         * 获取读锁
         */
        boolean readlock(DistributedLock lock) {
            return lock.readLock();
        };
    
        /**
         * 获取读锁
         */
        boolean writeLock(DistributedLock lock) {
            return lock.writeLock();
        };
    
        /**
         * 释放锁
         */
        void unlock(DistributedLock lock) {
            lock.unlock();
        };
    }
    
    package com.srr.lock;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 测试共享锁
     */
    public class SharedLockTest {
        private static volatile int count = 0;
        public static void main(String[] args) throws Exception {
            final Lock lock = new Lock();
            final CountDownLatch countDownLatch = new CountDownLatch(10);
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    testWriteLock(8);
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    testReadLock(10);
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    testReadLock(20);
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    testWriteLock(11);
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    testWriteLock(30);
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    testReadLock(9);
                }
            }).start();
    
            countDownLatch.await();
        }
    
        // 读锁
        private static void testReadLock(long sleepTime) {
            try {
                Lock lock = new Lock();
                DistributedLock dlock = new ZKSharedLock();
                lock.readlock(dlock);
                System.out.println("i get readlock ->" + sleepTime);
                System.out.println("count = "+ count);
                Thread.sleep(sleepTime);
                lock.unlock(dlock);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        // 写锁
        private static void testWriteLock(long sleepTime) {
            try {
                Lock lock = new Lock();
                DistributedLock dlock = new ZKSharedLock();
                lock.writeLock(dlock);
                System.out.println("i get writelock ->" + sleepTime);
                count++;
                Thread.sleep(sleepTime);
                lock.unlock(dlock);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    运行结果:

     从结果可以看出读锁和读锁可以共享锁,而写锁必须等待读锁或者写锁释放之后才能获取锁。

    最后,zk分布式锁完美解决方案:

    • Apache Curator
    • Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
    • Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.

    网上很多文章竟然标题用Curator实现分布式锁,大哥Curator框架本身已经实现了分布式锁而且提供了各种各样的锁api供大家使用,我们不用再基于Curator实现分布式锁,这不是多此一举吗?这里给出一个简单的使用案例,旨在说明意图:

    package com.srr.lock;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     *  测试场景
     *  count从1加到101
     *  使用redis分布式锁在分布式环境下保证结果正确
     */
    public class CuratorDistributedLockTest {
        private static final String lockPath = "/curator_lock";
        //zk地址和端口
        public static final String zookeeperConnectionString = "192.168.32.129:2181";
    
        volatile int  count = 1;
    
        public void inc(){
            for(int i = 0;i<10;i++){
                count++;
                System.out.println("count == "+count);
            }
        }
    
        public int getCount(){
            return count;
        }
    
        public static void main(String[] args) throws InterruptedException {
            final T t = new T();
            final Lock lock = new Lock();
            final CountDownLatch countDownLatch = new CountDownLatch(4);
            for(int i = 0;i<4;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(10, 5000);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();
                        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                        try {
                            if (lock.acquire(10 * 1000, TimeUnit.SECONDS))
                            {
                                try
                                {
                                    System.out.println("get the lock");
                                    t.inc();
                                }
                                finally
                                {
                                    lock.release();
                                    System.out.println("unlock the lock");
                                }
                            }
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                        countDownLatch.countDown();
                    }
                }).start();
            }
    
            countDownLatch.await();
            System.out.println("total count == "+t.getCount());
        }
    }

    运行结果:

     如果想更多了解Curator框架,请移步http://curator.apache.org/,官网给出了详细的使用案例及介绍。至此zk实现分布式锁总结完毕!

     原创不易,请多多关注!

  • 相关阅读:
    biji001
    公司内部openStack环境信息
    def
    CI调试
    一:Java之面向对象基本概念
    STL_算法_Heap算法(堆排)(精)
    IOS开发-经常使用站点集合
    【iOS开发-47】怎样下载iOS 7.1 Simulator 以及iOS 8离线的Documentation这些文件?
    设计模式简介
    how to deal with &quot;no such file error or diretory&quot; error for a new programmer in QT creator
  • 原文地址:https://www.cnblogs.com/sx-bj-srr/p/zookeeper.html
Copyright © 2011-2022 走看看