zoukankan      html  css  js  c++  java
  • 7、zookeeper应用场景-分布式锁

    分布式锁

      实现原理:有序节点+watch监听机制实现

    分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具Zookeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如果实现排他锁

    设计思路

    1. 每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock_000000001

    2. 客户端取得/Locks下子节点,并进行排序,判断排在前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功

    3. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock_000000002,那么则监听Lock_000000001

    4. 当前一位锁节点(Lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock_000000002)的逻辑

    5. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁

    6. zookeeper是有工具包的(这里采用手写)
    // 线程测试类
    public class ThreadTest {
        public static void delayOperation(){
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        static interface Runable{
            void run();
        }
        public static void run(Runable runable,int threadNum){
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 30,
                    0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            for (int i = 0; i < threadNum; i++) {
                threadPoolExecutor.execute(runable::run);
            }
            threadPoolExecutor.shutdown();
        }
    
        public static void main(String[] args) {
    //        DistributedLock distributedLock = new DistributedLock();
    //        distributedLock.acquireLock();
    //        delayOperation();
    //        distributedLock.releaseLock();
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            // 每秒打印信息
            run(() -> {
                for (int i = 0; i < 999999999; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    String format = dateTimeFormatter.format(LocalDateTime.now());
                    System.out.println(format);
                }
            },1);
            // 线程测试
            run(() -> {
                DistributedLock distributedLock = new DistributedLock();
                distributedLock.acquireLock();
                delayOperation();
                distributedLock.releaseLock();
            },30);
        }
    }
    public class DistributedLock {
        private String IP = "192.168.133.133:2181";
        private final String ROOT_LOCK = "/Root_Lock";
        private final String LOCK_PREFIX = "/Lock_";
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
        private final byte[] DATA = new byte[0];
    
        private ZooKeeper zookeeper;
        private String path;
    
        private void init(){
            // 初始化
            try {
                zookeeper = new ZooKeeper(IP, 200000, w -> {
                    if(w.getState() == Watcher.Event.KeeperState.SyncConnected){
                        System.out.println("连接成功");
                    }
                    countDownLatch.countDown();
                });
                countDownLatch.await();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        // 暴露的外部方法,主逻辑
        public void acquireLock(){
            init();
            createLock();
            attemptLock();
        }
    
        // 暴露的外部方法,主逻辑
        public void releaseLock(){
            try {
                zookeeper.delete(path,-1);
                System.out.println("锁释放了" + path);
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    
        private void createLock(){
            try {
                // 创建一个目录节点
                Stat root = zookeeper.exists(ROOT_LOCK, false);
                if(root == null)
                    zookeeper.create(ROOT_LOCK, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                // 目录下创建子节点
                path = zookeeper.create(ROOT_LOCK + LOCK_PREFIX, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        private Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Event.EventType.NodeDeleted){
                    synchronized (this){
                        this.notifyAll();
                    }
                }
            }
        };
    
        private void attemptLock(){
            try {
                // 获取正在排队的节点,由于是zookeeper生成的临时节点,不会出错,这里不能加监视器
                // 因为添加了监视器后,任何子节点的变化都会触发监视器
                List<String> nodes = zookeeper.getChildren(ROOT_LOCK,false);
                nodes.sort(String::compareTo);
                // 获取自身节点的排名
                int ranking = nodes.indexOf(path.substring(ROOT_LOCK.length() + 1));
                // 已经是最靠前的节点了,获取锁
                if(ranking == 0){
                    return;
                }else {
                    // 并不是靠前的锁,监视自身节点的前一个节点
                    Stat status = zookeeper.exists(ROOT_LOCK+"/"+nodes.get(ranking - 1), watcher);
                    // 有可能这这个判断的瞬间,0号完成了操作(此时我们应该判断成功自旋才对),但是上面的status变量已经获取了值并且不为空,1号沉睡
                    // 但是,请注意自行测试,虽然1号表面上沉睡了,但是实际上watcher.wait()是瞬间唤醒的
                    if(status == null){
                        attemptLock();
                    }else {
                        synchronized (watcher){
                            watcher.wait();
                        }
                        attemptLock();
                    }
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    51nod1331 狭窄的通道
    noip2016 提高组
    noip2016 普及组
    noip车站分级 拓扑排序
    【NOIP1999】邮票面值设计 dfs+dp
    视频智能分析平台EasyCVR衍生版视频管理平台网页导航栏activeNav的背景照片异常的处理方式
    TSINGSEE青犀视频智能分析平台EasyCVR中性版本如何自定义平台内信息的变更?
    人脸识别/车牌识别视频智能分析系统EasyCVR通过接口GetApiV1Devices调用获取设备信息不成功原因分析
    超低延迟直播系统Webrtc编译android报错The installation of the Chrome OS default fonts failed问题
    超低延时安防直播系统webrtc-client测试推送多路视频流关闭其中一路后所有推流都关闭问题解决
  • 原文地址:https://www.cnblogs.com/lemon-flm/p/14605388.html
Copyright © 2011-2022 走看看