zoukankan      html  css  js  c++  java
  • ZooKeeper 分布式共享锁的实现

    原创播客,如需转载请注明出处。原文地址:http://www.cnblogs.com/crawl/p/8352919.html 

    ----------------------------------------------------------------------------------------------------------------------------------------------------------

    笔记中提供了大量的代码示例,需要说明的是,大部分代码示例都是本人所敲代码并进行测试,不足之处,请大家指正~

    本博客中所有言论仅代表博主本人观点,若有疑惑或者需要本系列分享中的资料工具,敬请联系 qingqing_crawl@163.com

    GitHub:https://github.com/QingqingQi

    -----------------------------------------------------------------------------------------------------------------------------------------------------------

    前言:ZooKeeper 是提供少量数据存储和管理的分布式协调服务。适合存储状态管理信息,可以进行数据的读写,同步,提供对数据节点的监听功能。利用 ZooKeeper 可以实现很多功能,比如:Hadoop2.0,使用 Zookeeper 的事件处理确保整个集群只有一个活跃的 NameNode,存储配置信息等;可以利用 ZooKeeper 感知集群中哪台主机宕机或者下线等等。今天介绍另一个常用的功能,利用 Zookeeper 实现分布式共享锁。

    一、简要介绍

    利用 Zookeeper 实现分布式共享锁,可以做到一次只有指定个数的客户端访问服务器的某些资源。

    二、实现步骤

    利用 Zookeeper 实现分布式共享锁的步骤大致可以分为以下几步:

    1. 客户端上线即向 Zookeeper 注册,创建一把锁

    2. 判断是否只有一个客户端工作,若只有一个客户端工作,此客户端可以处理业务

    3. 获取父节点下注册的所有锁,通过判断自己是否是号码最小的那一把锁,若是则可以处理业务,否则等待

    值的注意的是,在某一客户端获取到锁处理完业务后,必须释放锁

    三、实现代码

    1. 新建一个 DistributedLock 类

    private ZooKeeper zkClient = null;
        
        //连接字符串
        private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181";
        
        //超时时间
        private static final int sessionTimeout = 2000;
        
        //父节点
        private static final String parentNode = "/locks";
        
        //记录自己创建子节点的路径
        private volatile String thisPath;
        
        public static void main(String[] args) throws Exception {
            //1.获取 ZooKeeper 的客户端连接
            DistributedLock distLock = new DistributedLock();
            distLock.getZKClient();
            
            //2.注册一把锁
            distLock.regiestLock();
            
            //3.监听父节点,判断是否只有自己在线
            distLock.watchParent();
     }

    2. main 方法中定义了三个方法

    1)getZKClient():用来获取 Zookeeper 客户端的连接

    其中 process 方法是当监听节点发生变化时调用,其中获取定义的父节点的所有子节点,然后判断当前节点是否是最小节点,若是则进行业务逻辑处理阶段,并重新注册一把新的锁

    //获取 zk 客户端
        public void getZKClient() throws Exception {
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                
                @Override
                public void process(WatchedEvent event) {
                    //判断事件类型,只处理子节点变化事件
                    if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                        try {
                            List<String> childrens = zkClient.getChildren(parentNode, true);
                            //判断自己是否是最小的
                            String thisNode = thisPath.substring((parentNode + "/").length());
                            Collections.sort(childrens);
                            if(childrens.indexOf(thisNode) == 0){
                                //处理业务逻辑
                                dosomething();
                                //重新注册一把新的锁
                                thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
    }

    2)main 中的第二个方法是 rediestLock()

    调用 Zookeeper 客户端的 create() 方法,建立一个新的节点

    //注册一把锁
        public void regiestLock() throws Exception {
            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }

    3)第三个是 watchParent() 方法

    在此方法中判断是否只有一个节点在线,若只有自己一个节点,则调用业务处理的方法

    //监听父节点,判断是否只有自己在线
        public void watchParent() throws Exception {
            List<String> childrens = zkClient.getChildren(parentNode, true);
            if (childrens != null && childrens.size() == 1) {
                //只有自己在线,处理业务逻辑(处理完业务逻辑,必须删释放锁)
                dosomething();
            } else {
                //不是只有自己在线,说明别人已经获取到锁,等待
                Thread.sleep(Long.MAX_VALUE);
            }
        }

    4)最后一个是自定义的业务逻辑方法

    需要注意的是,当处理完业务逻辑后,必须释放锁

    //业务逻辑方法,注意:需要在最后释放锁
        public void dosomething() throws Exception {
            System.out.println("或得到锁:" + thisPath);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("释放锁:" + thisPath);
                zkClient.delete(thisPath, -1);
            }
        }

    3. 最后贴一下全部代码

    package com.software.bigdata.zkdistlock;
    
    import java.util.Collections;
    import java.util.List;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    /**
     * @Description: 分布式共享锁
     * 
     * @author Crawl
     * @date 2018年1月25日 下午5:02:42
     */
    public class DistributedLock {
        
        private ZooKeeper zkClient = null;
        
        //连接字符串
        private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181";
        
        //超时时间
        private static final int sessionTimeout = 2000;
        
        //父节点
        private static final String parentNode = "/locks";
        
        //记录自己创建子节点的路径
        private volatile String thisPath;
        
        public static void main(String[] args) throws Exception {
            //1.获取 ZooKeeper 的客户端连接
            DistributedLock distLock = new DistributedLock();
            distLock.getZKClient();
            
            //2.注册一把锁
            distLock.regiestLock();
            
            //3.监听父节点,判断是否只有自己在线
            distLock.watchParent();
        }
        
        //业务逻辑方法,注意:需要在最后释放锁
        public void dosomething() throws Exception {
            System.out.println("或得到锁:" + thisPath);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("释放锁:" + thisPath);
                zkClient.delete(thisPath, -1);
            }
        }
        
        //监听父节点,判断是否只有自己在线
        public void watchParent() throws Exception {
            List<String> childrens = zkClient.getChildren(parentNode, true);
            if (childrens != null && childrens.size() == 1) {
                //只有自己在线,处理业务逻辑(处理完业务逻辑,必须删释放锁)
                dosomething();
            } else {
                //不是只有自己在线,说明别人已经获取到锁,等待
                Thread.sleep(Long.MAX_VALUE);
            }
        }
        
        //注册一把锁
        public void regiestLock() throws Exception {
            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
        
        //获取 zk 客户端
        public void getZKClient() throws Exception {
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                
                @Override
                public void process(WatchedEvent event) {
                    //判断事件类型,只处理子节点变化事件
                    if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                        try {
                            List<String> childrens = zkClient.getChildren(parentNode, true);
                            //判断自己是否是最小的
                            String thisNode = thisPath.substring((parentNode + "/").length());
                            Collections.sort(childrens);
                            if(childrens.indexOf(thisNode) == 0){
                                //处理业务逻辑
                                dosomething();
                                //重新注册一把新的锁
                                thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    
    }
  • 相关阅读:
    第二十一章流 1流的操作 简单
    第二十章友元类与嵌套类 1友元类 简单
    第十九章 19 利用私有继承来实现代码重用 简单
    第二十章友元类与嵌套类 2嵌套类 简单
    第十九章 8链表类Node 简单
    第二十一章流 3用cin输入 简单
    第十九章 10 图书 药品管理系统 简单
    第十九章 11图书 药品管理系统 简单
    第二十一章流 4文件的输入和输出 简单
    第十九章 12 什么时候使用私有继承,什么时候使用包含 简单
  • 原文地址:https://www.cnblogs.com/crawl/p/8352919.html
Copyright © 2011-2022 走看看