zoukankan      html  css  js  c++  java
  • ZooKeeper 分布式共享锁的实现

    原创播客,如需转载请注明出处。原文地址:http://www.cnblogs.com/crawl/p/8352919.html 

    ----------------------------------------------------------------------------------------------------------------------------------------------------------

    笔记中提供了大量的代码示例,需要说明的是,大部分代码示例都是本人所敲代码并进行测试,不足之处,请大家指正~

    本博客中所有言论仅代表博主本人观点,若有疑惑或者需要本系列分享中的资料工具,敬请联系 qingqing_crawl@163.com

    GitHub:https://github.com/QingqingQi

    -----------------------------------------------------------------------------------------------------------------------------------------------------------

    前言:ZooKeeper 是提供少量数据存储和管理的分布式协调服务。适合存储状态管理信息,可以进行数据的读写,同步,提供对数据节点的监听功能。利用 ZooKeeper 可以实现很多功能,比如:Hadoop2.0,使用 Zookeeper 的事件处理确保整个集群只有一个活跃的 NameNode,存储配置信息等;可以利用 ZooKeeper 感知集群中哪台主机宕机或者下线等等。今天介绍另一个常用的功能,利用 Zookeeper 实现分布式共享锁。

    一、简要介绍

    利用 Zookeeper 实现分布式共享锁,可以做到一次只有指定个数的客户端访问服务器的某些资源。

    二、实现步骤

    利用 Zookeeper 实现分布式共享锁的步骤大致可以分为以下几步:

    1. 客户端上线即向 Zookeeper 注册,创建一把锁

    2. 判断是否只有一个客户端工作,若只有一个客户端工作,此客户端可以处理业务

    3. 获取父节点下注册的所有锁,通过判断自己是否是号码最小的那一把锁,若是则可以处理业务,否则等待

    值的注意的是,在某一客户端获取到锁处理完业务后,必须释放锁

    三、实现代码

    1. 新建一个 DistributedLock 类

    private ZooKeeper zkClient = null;
        
        //连接字符串
        private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181";
        
        //超时时间
        private static final int sessionTimeout = 2000;
        
        //父节点
        private static final String parentNode = "/locks";
        
        //记录自己创建子节点的路径
        private volatile String thisPath;
        
        public static void main(String[] args) throws Exception {
            //1.获取 ZooKeeper 的客户端连接
            DistributedLock distLock = new DistributedLock();
            distLock.getZKClient();
            
            //2.注册一把锁
            distLock.regiestLock();
            
            //3.监听父节点,判断是否只有自己在线
            distLock.watchParent();
     }

    2. main 方法中定义了三个方法

    1)getZKClient():用来获取 Zookeeper 客户端的连接

    其中 process 方法是当监听节点发生变化时调用,其中获取定义的父节点的所有子节点,然后判断当前节点是否是最小节点,若是则进行业务逻辑处理阶段,并重新注册一把新的锁

    //获取 zk 客户端
        public void getZKClient() throws Exception {
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                
                @Override
                public void process(WatchedEvent event) {
                    //判断事件类型,只处理子节点变化事件
                    if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                        try {
                            List<String> childrens = zkClient.getChildren(parentNode, true);
                            //判断自己是否是最小的
                            String thisNode = thisPath.substring((parentNode + "/").length());
                            Collections.sort(childrens);
                            if(childrens.indexOf(thisNode) == 0){
                                //处理业务逻辑
                                dosomething();
                                //重新注册一把新的锁
                                thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
    }

    2)main 中的第二个方法是 rediestLock()

    调用 Zookeeper 客户端的 create() 方法,建立一个新的节点

    //注册一把锁
        public void regiestLock() throws Exception {
            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }

    3)第三个是 watchParent() 方法

    在此方法中判断是否只有一个节点在线,若只有自己一个节点,则调用业务处理的方法

    //监听父节点,判断是否只有自己在线
        public void watchParent() throws Exception {
            List<String> childrens = zkClient.getChildren(parentNode, true);
            if (childrens != null && childrens.size() == 1) {
                //只有自己在线,处理业务逻辑(处理完业务逻辑,必须删释放锁)
                dosomething();
            } else {
                //不是只有自己在线,说明别人已经获取到锁,等待
                Thread.sleep(Long.MAX_VALUE);
            }
        }

    4)最后一个是自定义的业务逻辑方法

    需要注意的是,当处理完业务逻辑后,必须释放锁

    //业务逻辑方法,注意:需要在最后释放锁
        public void dosomething() throws Exception {
            System.out.println("或得到锁:" + thisPath);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("释放锁:" + thisPath);
                zkClient.delete(thisPath, -1);
            }
        }

    3. 最后贴一下全部代码

    package com.software.bigdata.zkdistlock;
    
    import java.util.Collections;
    import java.util.List;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    /**
     * @Description: 分布式共享锁
     * 
     * @author Crawl
     * @date 2018年1月25日 下午5:02:42
     */
    public class DistributedLock {
        
        private ZooKeeper zkClient = null;
        
        //连接字符串
        private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181";
        
        //超时时间
        private static final int sessionTimeout = 2000;
        
        //父节点
        private static final String parentNode = "/locks";
        
        //记录自己创建子节点的路径
        private volatile String thisPath;
        
        public static void main(String[] args) throws Exception {
            //1.获取 ZooKeeper 的客户端连接
            DistributedLock distLock = new DistributedLock();
            distLock.getZKClient();
            
            //2.注册一把锁
            distLock.regiestLock();
            
            //3.监听父节点,判断是否只有自己在线
            distLock.watchParent();
        }
        
        //业务逻辑方法,注意:需要在最后释放锁
        public void dosomething() throws Exception {
            System.out.println("或得到锁:" + thisPath);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("释放锁:" + thisPath);
                zkClient.delete(thisPath, -1);
            }
        }
        
        //监听父节点,判断是否只有自己在线
        public void watchParent() throws Exception {
            List<String> childrens = zkClient.getChildren(parentNode, true);
            if (childrens != null && childrens.size() == 1) {
                //只有自己在线,处理业务逻辑(处理完业务逻辑,必须删释放锁)
                dosomething();
            } else {
                //不是只有自己在线,说明别人已经获取到锁,等待
                Thread.sleep(Long.MAX_VALUE);
            }
        }
        
        //注册一把锁
        public void regiestLock() throws Exception {
            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
        
        //获取 zk 客户端
        public void getZKClient() throws Exception {
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                
                @Override
                public void process(WatchedEvent event) {
                    //判断事件类型,只处理子节点变化事件
                    if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                        try {
                            List<String> childrens = zkClient.getChildren(parentNode, true);
                            //判断自己是否是最小的
                            String thisNode = thisPath.substring((parentNode + "/").length());
                            Collections.sort(childrens);
                            if(childrens.indexOf(thisNode) == 0){
                                //处理业务逻辑
                                dosomething();
                                //重新注册一把新的锁
                                thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    
    }
  • 相关阅读:
    druid-1.0.13 数据库配置文件密码加密
    PostConstruct注解
    easyui formatter 返回easyui组件
    小师妹问 easyUI mergeCells 行合并后表头和内容对不齐
    Java Split以竖线作为分隔符
    Integer比较值的时候小心使用
    js 关键字 in
    Asp.net中防止用户多次登录的方法
    C#取得站点跟目录
    解读支付宝接口实现步骤
  • 原文地址:https://www.cnblogs.com/crawl/p/8352919.html
Copyright © 2011-2022 走看看