zoukankan      html  css  js  c++  java
  • ZooKeeper学习笔记四:使用ZooKeeper实现一个简单的分布式锁

    作者:Grey

    原文地址: ZooKeeper学习笔记四:使用ZooKeeper实现一个简单的分布式锁

    前置知识

    完成ZooKeeper集群搭建以及熟悉ZooKeeperAPI基本使用

    需求

    当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

    在单机情况下,可以使用JUC包里面的工具来进行互斥控制。

    但是在分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁的由来。

    当多个进程不在同一个系统中,就需要用分布式锁控制多个进程对资源的访问。

    我们可以用ZooKeeper来模拟实现一个简单的分布式锁

    环境准备

    一个zk集权,ip和端口分别为:

    • 192.168.205.145:2181
    • 192.168.205.146:2181
    • 192.168.205.147:2181
    • 192.168.205.148:2181

    定义主方法

    App.java

    public class App {
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    ZkLock lock = new ZkLock();
                    lock.lock(); // 开启锁
                    System.out.println(Thread.currentThread().getName() + " doing work");
                    lock.release(); // 释放锁
                }).start();
            }
            while (true) {
            }
        }
    }
    

    如上,我们设计了一个ZkLock,其中lock方法是锁定资源,release方法是释放资源,我们并发了10个线程并发访问来模拟。

    public class ZkLock implements AsyncCallback.StringCallback, Watcher, AsyncCallback.StatCallback, AsyncCallback.Children2Callback {
        private CountDownLatch latch;
        private ZooKeeper zk;
        private String identify;
        private String lockPath;
        private String pathName;
    
        public ZkLock() {
            identify = Thread.currentThread().getName();
            lockPath = "/lock";
            latch = new CountDownLatch(1);
            zk = ZookeeperConfig.create(ADDRESS + "/testLock");
        }
    
        public void lock() {
            try {
                zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void release() {
            try {
                zk.delete(pathName, -1);
                System.out.println(identify + " over work....");
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    
    
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (null != name) {
                // 创建成功
                System.out.println(identify + " created " + name);
                pathName = name;
                zk.getChildren("/", false, this, "dasdfas");
            }
        }
    
        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    
            sort(children);
            int i = children.indexOf(pathName.substring(1));
            if (i == 0) {
                // 是第一个,获得锁,可以执行
                System.out.println(identify + " first...");
                try {
                    zk.setData("/", identify.getBytes(UTF_8), -1);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            } else {
                zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
            }
    
        }
    
    
        @Override
        public void process(WatchedEvent event) {
            switch (event.getType()) {
                case None:
                    break;
                case NodeCreated:
                    break;
                case NodeDeleted:
                    zk.getChildren("/", false, this, "sdf");
                    break;
                case NodeDataChanged:
                    break;
                case NodeChildrenChanged:
                    break;
            }
        }
    
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
    
        }
    }
    

    关于上述代码的说明,我们规定创建的zk目录为/testLock,所以我们可以通过zk客户端在集群中先把/testLock目录建好,后续线程争抢的时候,我们只需要创建序列化的临时节点(以/lock开头),因为是序列化的,所以我们可以设置让第一个创建好节点的线程抢到锁,其他的线程排队等待。

    所以lock方法实现如下:

    zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());
    

    lock方法在执行的时候,会有一个回调,即:当节点创建成功后,会判断/testLock节点中有没有已经创建好的且在当前节点之前的节点,有的话,则注册一个一个对于/testLock目录的监听:

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if (null != name) {
                // 创建成功
                System.out.println(identify + " created " + name);
                pathName = name;
                zk.getChildren("/", false, this, "dasdfas");
            }
        }
    

    一旦发现/testLock目录下已经有节点了,那么我们拿到/testLock下的所有节点,并排序,取最小的那个节点执行即可:

      @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    
            sort(children);
            int i = children.indexOf(pathName.substring(1));
            if (i == 0) {
                // 是第一个,获得锁,可以执行
                System.out.println(identify + " first...");
                try {
                    zk.setData("/", identify.getBytes(UTF_8), -1);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            } else {
                zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
            }
    
        }
    

    release方法很简单,只需要把当前执行完毕的节点删除即可:

        public void release() {
            try {
                zk.delete(pathName, -1);
                System.out.println(identify + " over work....");
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    

    运行效果

    确保zk中有/testLock这个节点,如果没有,请先创建一个:

    Run App.java

    可以看到控制台输出:

    Thread-5 created /lock0000000000
    Thread-4 created /lock0000000001
    Thread-1 created /lock0000000002
    Thread-9 created /lock0000000003
    Thread-6 created /lock0000000004
    Thread-2 created /lock0000000005
    Thread-3 created /lock0000000006
    Thread-0 created /lock0000000007
    Thread-8 created /lock0000000008
    Thread-7 created /lock0000000009
    Thread-5 first...
    Thread-5 doing work
    Thread-5 over work....
    Thread-4 first...
    Thread-4 doing work
    Thread-4 over work....
    Thread-1 first...
    Thread-1 doing work
    Thread-1 over work....
    Thread-9 first...
    Thread-9 doing work
    Thread-9 over work....
    Thread-6 first...
    Thread-6 doing work
    Thread-6 over work....
    Thread-2 first...
    Thread-2 doing work
    Thread-2 over work....
    Thread-3 first...
    Thread-3 doing work
    Thread-3 over work....
    Thread-0 first...
    Thread-0 doing work
    Thread-0 over work....
    Thread-8 first...
    Thread-8 doing work
    Thread-8 over work....
    Thread-7 first...
    Thread-7 doing work
    Thread-7 over work....
    
    

    完整代码

    Github

  • 相关阅读:
    【Navicat】查看历史执行的SQL
    什么是webpack模块化构建工具
    靠边的列表如果没有设置margin-left:20px,那么是看不到列表序号的。
    在博客园中复制代码到网页中,有时候会存在异常,如下:
    / WebAPP开发与小程序 / 步骤一 · 4-5 地图搜索与poi结合(2)
    忘记样式属性对应的值时,可以使用以下方法进行操作
    //点击按钮加减音频音量到最小会出现bug什么意思???
    组件化网页开发 3步骤 / 20门课
    position:absolute 按钮左右分布:left:0 和 right:0 以及雪碧图
    查看引入的文件是否成功
  • 原文地址:https://www.cnblogs.com/greyzeng/p/14864943.html
Copyright © 2011-2022 走看看