zoukankan      html  css  js  c++  java
  • ZooKeeper学习笔记四:使用ZooKeeper实现一个简单的分布式锁

    作者:Grey

    原文地址: ZooKeeper学习笔记四:使用ZooKeeper实现一个简单的分布式锁

    前置知识

    完成ZooKeeper集群搭建以及熟悉ZooKeeperAPI基本使用

    需求

    当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

    在单机情况下,可以使用JUC包里面的工具来进行互斥控制。

    但是在分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁的由来。

    当多个进程不在同一个系统中,就需要用分布式锁控制多个进程对资源的访问。

    我们可以用ZooKeeper来模拟实现一个简单的分布式锁

    环境准备

    一个zk集权,ip和端口分别为:

    • 192.168.205.145:2181
    • 192.168.205.146:2181
    • 192.168.205.147:2181
    • 192.168.205.148:2181

    定义主方法

    App.java

    public class App {
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    ZkLock lock = new ZkLock();
                    lock.lock(); // 开启锁
                    System.out.println(Thread.currentThread().getName() + " doing work");
                    lock.release(); // 释放锁
                }).start();
            }
            while (true) {
            }
        }
    }
    

    如上,我们设计了一个ZkLock,其中lock方法是锁定资源,release方法是释放资源,我们并发了10个线程并发访问来模拟。

    public class ZkLock implements AsyncCallback.StringCallback, Watcher, AsyncCallback.StatCallback, AsyncCallback.Children2Callback {
        private CountDownLatch latch;
        private ZooKeeper zk;
        private String identify;
        private String lockPath;
        private String pathName;
    
        public ZkLock() {
            identify = Thread.currentThread().getName();
            lockPath = "/lock";
            latch = new CountDownLatch(1);
            zk = ZookeeperConfig.create(ADDRESS + "/testLock");
        }
    
        public void lock() {
            try {
                zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void release() {
            try {
                zk.delete(pathName, -1);
                System.out.println(identify + " over work....");
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    
    
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (null != name) {
                // 创建成功
                System.out.println(identify + " created " + name);
                pathName = name;
                zk.getChildren("/", false, this, "dasdfas");
            }
        }
    
        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    
            sort(children);
            int i = children.indexOf(pathName.substring(1));
            if (i == 0) {
                // 是第一个,获得锁,可以执行
                System.out.println(identify + " first...");
                try {
                    zk.setData("/", identify.getBytes(UTF_8), -1);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            } else {
                zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
            }
    
        }
    
    
        @Override
        public void process(WatchedEvent event) {
            switch (event.getType()) {
                case None:
                    break;
                case NodeCreated:
                    break;
                case NodeDeleted:
                    zk.getChildren("/", false, this, "sdf");
                    break;
                case NodeDataChanged:
                    break;
                case NodeChildrenChanged:
                    break;
            }
        }
    
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
    
        }
    }
    

    关于上述代码的说明,我们规定创建的zk目录为/testLock,所以我们可以通过zk客户端在集群中先把/testLock目录建好,后续线程争抢的时候,我们只需要创建序列化的临时节点(以/lock开头),因为是序列化的,所以我们可以设置让第一个创建好节点的线程抢到锁,其他的线程排队等待。

    所以lock方法实现如下:

    zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());
    

    lock方法在执行的时候,会有一个回调,即:当节点创建成功后,会判断/testLock节点中有没有已经创建好的且在当前节点之前的节点,有的话,则注册一个一个对于/testLock目录的监听:

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (null != name) {
                // 创建成功
                System.out.println(identify + " created " + name);
                pathName = name;
                zk.getChildren("/", false, this, "dasdfas");
            }
        }
    

    一旦发现/testLock目录下已经有节点了,那么我们拿到/testLock下的所有节点,并排序,取最小的那个节点执行即可:

      @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    
            sort(children);
            int i = children.indexOf(pathName.substring(1));
            if (i == 0) {
                // 是第一个,获得锁,可以执行
                System.out.println(identify + " first...");
                try {
                    zk.setData("/", identify.getBytes(UTF_8), -1);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            } else {
                zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
            }
    
        }
    

    release方法很简单,只需要把当前执行完毕的节点删除即可:

        public void release() {
            try {
                zk.delete(pathName, -1);
                System.out.println(identify + " over work....");
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    

    运行效果

    确保zk中有/testLock这个节点,如果没有,请先创建一个:

    Run App.java

    可以看到控制台输出:

    Thread-5 created /lock0000000000
    Thread-4 created /lock0000000001
    Thread-1 created /lock0000000002
    Thread-9 created /lock0000000003
    Thread-6 created /lock0000000004
    Thread-2 created /lock0000000005
    Thread-3 created /lock0000000006
    Thread-0 created /lock0000000007
    Thread-8 created /lock0000000008
    Thread-7 created /lock0000000009
    Thread-5 first...
    Thread-5 doing work
    Thread-5 over work....
    Thread-4 first...
    Thread-4 doing work
    Thread-4 over work....
    Thread-1 first...
    Thread-1 doing work
    Thread-1 over work....
    Thread-9 first...
    Thread-9 doing work
    Thread-9 over work....
    Thread-6 first...
    Thread-6 doing work
    Thread-6 over work....
    Thread-2 first...
    Thread-2 doing work
    Thread-2 over work....
    Thread-3 first...
    Thread-3 doing work
    Thread-3 over work....
    Thread-0 first...
    Thread-0 doing work
    Thread-0 over work....
    Thread-8 first...
    Thread-8 doing work
    Thread-8 over work....
    Thread-7 first...
    Thread-7 doing work
    Thread-7 over work....
    
    

    完整代码

    Github

  • 相关阅读:
    CodeForces 660D Number of Parallelograms
    【POJ 1082】 Calendar Game
    【POJ 2352】 Stars
    【POJ 2481】 Cows
    【POJ 1733】 Parity Game
    【NOI 2002】 银河英雄传说
    【NOI 2015】 程序自动分析
    【POJ 1704】 Georgia and Bob
    【HDU 2176】 取(m堆)石子游戏
    【SDOI 2016】 排列计数
  • 原文地址:https://www.cnblogs.com/greyzeng/p/14864943.html
Copyright © 2011-2022 走看看