zoukankan      html  css  js  c++  java
  • Zookeeper 实现分布式锁服务

    第一篇文章, 有错误还请指正

    本篇主要简单介绍基于 zookeeper 3.4的 临时顺序节点, 以一个简单的demo演示分布式锁的获取, 直接贴代码, 代码里边有我的注释.

    修改下server地址和端口可直接跑. 核心在 lock() 方法.

    maven :

    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.5</version>
    </dependency>

    过程如下:

    1. 创建临时顺序节点

    2. 获得节点所在父节点所有的子节点

    3. 根据节点顺序排序, 获得最小顺序节点

    4. 和当前节点比较, 如果等于当前节点, 则获得锁

    5. 如果不等于, 则获得当前节点的前一个节点, 并注册监听

    6. 前一节点如果不存在, 则获得锁

    7. 等待前一节点释放锁

    8. 随后关闭客户端,释放锁.

    package com.citi.csl.herman.zookeeper;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    //zoo.cfg
    //clientPort=2181
    //server.1=168.72.230.27:2888:3888
    //server.2=168.72.231.24:2888:3888
    //server.3=10.116.47.97:2888:3888
    class ZKServerConfig {
        // ip + clientPort
        private String zkServer = "168.72.230.27:2181,168.72.231.24:2181,10.116.47.97:2181";
        private int sessionTimeout = 30000;// ms
    
        public ZKServerConfig() {}
        public String getServers() {return zkServer;}
        public int getSessionTimeout() {return sessionTimeout;}
    }
    
    class ZKHelper {
        // wait until session is available.
        // connection is async, so when new Zookeeper() returned, just complete user
        // config, session is in 'CONNECTING' status.
        public static ZooKeeper getZookKeeper(ZKServerConfig config)
                throws IOException, InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zk = new ZooKeeper(config.getServers(),
                    config.getSessionTimeout(), new SessionReadyWatcher(
                            countDownLatch));
            countDownLatch.await();
            return zk;
        }
    
    }
    
    // sync helper
    class SessionReadyWatcher implements Watcher {
        CountDownLatch countDownLatch;
    
        SessionReadyWatcher(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
    
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState())
                countDownLatch.countDown();
            if (EventType.NodeDeleted == event.getType())
                countDownLatch.countDown();
        }
    }
    
    public class DistributedLock implements Lock {
    
        // test 
        public static void main(String[] args) throws InterruptedException {
            final String lockName = "distribution-lock";
            final CountDownLatch countDownLatch = new CountDownLatch(10);
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 10; i++) {
                executorService.submit(new Runnable() {
                    public void run() {
                        try {
                            DistributedLock lock = new DistributedLock(lockName);
                            System.out.println(Thread.currentThread().getName() + " connectted to server.");
                            System.out.println(Thread.currentThread().getName() + " try to get lock.");
                            lock.lock();
                            System.out.println(Thread.currentThread().getName() + " get the lock.");
                            // hold the lock a while
                            Thread.sleep(5000);
                            lock.unlock();
                            System.out.println(Thread.currentThread().getName() + " release the lock.");
                            countDownLatch.countDown();
                        } catch (Exception e) {
                        }
                    }
                });
            }
            //wait all lock release
            countDownLatch.await();
            //shutdown executors
            executorService.shutdown();
        }
    
        private ZooKeeper zk;
        // root lock path
        private String app_root = "/zk-herman";
        // lock resources
        private String lockName = "";
        // current lock name
        private String current_lock;
        // lock name before current lock
        private String pre_lock;
    
        private static final String ZK_PATH_SEPARATOR = "/";
    
        public DistributedLock(String lockName) {
            try {
                // connect to zk
                zk = ZKHelper.getZookKeeper(new ZKServerConfig());
                System.out.println("connected to zk.");
                this.lockName = lockName;
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    
        public void lock() {
            try {
                // create ephemeral sequential node
                current_lock = zk.create(app_root + ZK_PATH_SEPARATOR + lockName,
                        null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(current_lock + " has been set up");
                // get all child nodes
                List<String> subNodes = zk.getChildren(app_root, false);
                List<String> lockObjs = new ArrayList<String>(subNodes.size());
                // get all waiting locks
                for (String subNode : subNodes) {
                    if (subNode.startsWith(lockName))
                        lockObjs.add(subNode);
                }
                // sort
                Collections.sort(lockObjs);
                // if current is minimum, then get the lock
                if (current_lock.equals(app_root + "/" + lockObjs.get(0))) {
                    return;
                }
    
                // if not, get previous waiting lock
                String current_lock_name = current_lock.substring(current_lock
                        .lastIndexOf(ZK_PATH_SEPARATOR)+1);
                pre_lock = lockObjs.get(Collections.binarySearch(lockObjs,
                        current_lock_name) - 1);
    
                // wait previous lock unlock
                CountDownLatch countDownLatch = new CountDownLatch(1);
                // re-check whether previous lock exist, register watcher to listen
                // node change event
                Stat stat = zk.exists(app_root + "/" + pre_lock, new SessionReadyWatcher(
                        countDownLatch));
                // if not exist
                if (stat == null)
                    return;
                // wait previous get lock
                countDownLatch.await();
                return;
    
            } catch (Exception e) {
    
            }
        }
    
        // close connection then session closed, the ephemeral znode be deleted
        // automatically
        public void unlock() {
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public boolean tryLock() {
            return false;
        }
    
        public void lockInterruptibly() throws InterruptedException {
        }
    
        public boolean tryLock(long time, TimeUnit unit)
                throws InterruptedException {
            return false;
        }
    
        public Condition newCondition() {
            return null;
        }
    
    }
  • 相关阅读:
    Javescript基础api实现原理
    React Fiber
    ASP.NET中Form验证登录后反复跳转回登录页面的问题
    跨域部署Silverlight时需要注意的问题
    Windows 8 x64+Ruby 2上安装Sqlite3方法
    .NET 项目在源码控制中程序集的引用问题
    [EntLib]解决The type or namespace name 'Data' does not exist in the namespace 'Microsoft.Practices.EnterpriseLibrary' 的错误
    ASP.NET MVC Url中参数过长引发的问题
    Windows 8 x64 QQ2012/2013beta无法启动屌丝解决方法
    TechEd 2010参会小记
  • 原文地址:https://www.cnblogs.com/hermanlife/p/9494479.html
Copyright © 2011-2022 走看看