zoukankan      html  css  js  c++  java
  • zookeeper — 实现分布式锁

    一.前言

    在之前的文章中介绍过分布式锁的特点和利用Redis实现简单的分布式锁。但是分布式锁的实现还有很多其他方式,但是万变不离其宗,始终遵循一个特点:同一时刻只能有一个操作获取。这篇文章主要介绍如何基于zookeeper实现分布式锁。

    • zookeeper能够作为分布式锁实现的基础
    • 算法流程
    • 实现

    关于分布式锁的相关特性,这里不再赘述,请参考分布式锁


    ### 二.zookeeper能够作为分布式锁实现的基础

    这里回顾下分布式锁的特点:

    • 每次只能一个占用锁;
    • 可以重复进入锁;
    • 只有占用者才可以解锁;
    • 获取锁和释放锁都需要原子
    • 不能产生死锁
    • 尽量满足性能

    zookeeper中有一种临时顺序节点,它具有以下特征:

    • 时效性,当会话结束,节点将自动被删除
    • 顺序性,当多个应用向其注册顺序节点时,每个顺序号将只能被一个应用获取

    利用以上的特点可以满足分布式锁实现的基本要求:

    1. 因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的每次只能一个占用锁,因为只有它一个获取到,所以可以实现重复进入,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁

    2. zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的

    3. 因为注册的是临时节点,在会话期间内有效,所以不会产生死锁

    4. zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能

    三.算法流程

    1.获取锁

    需要获取分布式锁的应用都向zookeeper的/lock/{resouce}目录下注册sequence-前缀的节点,序号最小者获取到操作资源的权限:

    Note:
    这里的resource需要依据竞争的具体资源确定,如竞争账户则可以使用账户号作为resource。

    从图中可以看出,clientA的顺序号最小,由它获取到锁,操作资源。

    算法步骤

    1. client判断/lock目录是否存在,如果不存在则向其注册/lock的持久节点
    2. client判断/lock目录下是否存在竞争的资源resouce目录,如果不存在则向其注册/lock/resource的持久节点
    3. client向/lock/resource目录下注册/lock/resource/sequence-前缀的临时顺序节点,并得到顺序号
    4. client获取/lock/resource目录下的所有临时顺序子节点
    5. client判断临时子节点序号中是否存在比自身的序号小的节点。如果不存在,则获取到锁;如果存在,则对象该临时节点做watch监控
    6. 如果收到监控的临时节点被删除的通知,则再重复4、5步骤,直到获取到锁

    流程图

    2.释放锁

    因为最小的节点只被获取到锁的client持有,所以该锁不可能被其他client释放。同时释放锁只需要将临时顺序节点删除,也是原子性操作。


    ### 三.实现
    /**
     * 基于Zookeeper实现分布式锁
     *
     * @author huaijin
     */
    public class DistributedLockBaseZookeeper implements DistributedLock {
    
        private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class);
    
        /**
         * 利用空串作为各个节点存储的数据
         */
        private static final String EMPTY_DATA = "";
    
        /**
         * 分布式锁的根目录
         */
        private static final String LOCK_ROOT = "/lock";
    
        /**
         * zookeeper目录分隔符
         */
        private static final String PATH_SEPARATOR = "/";
    
        /**
         * 临时顺序节点前缀
         */
        private static final String LOCK_NODE_PREFIX = "sequence-";
    
        /**
         * 利用Lock和Condition实现等待通知
         */
        private Lock waitNotifierLock = new ReentrantLock();
        private Condition waitNotifier = waitNotifierLock.newCondition();
    
        /**
         * 操作zookeeper的client
         */
        private ZkClient zkClient;
    
        /**
         * 分布式资源的路径
         */
        private String resourcePath;
    
        /**
         * 锁节点完整前缀
         */
        private String lockNodePrefix;
    
        /**
         * 当前注册的临时顺序节点路径
         */
        private String currentLockNodePath;
    
        public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) {
            Objects.requireNonNull(zkClient, "zkClient must not be null!");
            if (resource == null || resource.isEmpty()) {
                throw new IllegalArgumentException("resource must not be null!");
            }
            this.zkClient = zkClient;
            this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource;
            this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX;
    
            // 创建分布式锁根目录
            if (!this.zkClient.exists(LOCK_ROOT)) {
                try {
                    this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT);
                } catch (ZkNodeExistsException e) {
                    // ignore, logging
                    log.warn("The root path for lock already exists.");
                }
            }
    
            // 创建资源目录
            if (!this.zkClient.exists(resourcePath)) {
                try {
                    this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT);
                } catch (ZkNodeExistsException e) {
                    // ignore, logging
                    log.warn("The resource path for [" + resourcePath + "] already exists.");
                }
            }
        }
    
    
        @Override
        public void lock() throws DistributedLockException {
            if (!acquireLock()) {
                // 如果获取锁不成功,则等待
                waitNotifierLock.lock();
                try {
                    waitNotifier.await();
                } catch (Exception e) {
                    throw new DistributedLockException("Interrupt when waiting notification.");
                } finally {
                    waitNotifierLock.unlock();
                }
            }
        }
    
        @Override
        public void unlock() {
            // 删除自身节点,释放锁
            zkClient.delete(currentLockNodePath);
        }
    
        private boolean acquireLock() throws DistributedLockException {
            // 如果当前未注册临时顺序节点,则注册
            if (this.currentLockNodePath == null) {
                this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL);
            }
            // 获取顺序号
            long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath);
            // 获取所有子节点
            List<String> childNodePaths = zkClient.getChildren(resourcePath);
            if (childNodePaths == null || childNodePaths.isEmpty()) {
                throw new DistributedLockException("Not exists child nodes.");
            }
            // 从所有子节点中获取最小子节点的顺序号
            long minSeq = 1000000L;
            int minIndex = -1;
            for (int i = 0; i < childNodePaths.size(); i++) {
                long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i));
                if (nodeSeq < minSeq) {
                    minSeq = nodeSeq;
                    minIndex = i;
                }
            }
            // 比较自身顺序号与最小序号
            if (lockNodeSeq > minSeq) {
                // 如果存在更小序号,则监控最小序号的子节点
                String minLockNodePath = childNodePaths.get(minIndex);
                zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath,
                        new ListenerForLockRelease());
                return false;
            }
            // 成功获取锁,返回
            return true;
        }
    
        private long fetchSeqFromNodePath(String nodePath) {
            String seq = nodePath.substring(lockNodePrefix.length());
            return Long.valueOf(seq);
        }
    
        private class ListenerForLockRelease implements IZkDataListener {
    
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
    
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                // 如果成功获取锁,则通知,让主线程返回
                if (acquireLock()) {
                    waitNotifierLock.lock();
                    try {
                        waitNotifier.signal();
                    } finally {
                        waitNotifierLock.unlock();
                    }
                }
            }
        }
    }
    
  • 相关阅读:
    android四大组件之contentprovider
    android自定义控件及自定义组合控件
    android四大组件之activity
    监听鼠标滚动事件,如滚动鼠标出现返回顶部按钮
    滚动鼠标出现某一元素
    checked 选择框选中
    原生JS一些操作
    闭合浮动的方法css
    合理提升WEB前端性能
    JS数组操作
  • 原文地址:https://www.cnblogs.com/lxyit/p/10387123.html
Copyright © 2011-2022 走看看