zoukankan      html  css  js  c++  java
  • ZooKeeper连接并创建节点以及实现分布式锁操作节点排序输出最小节点Demo

    class LockThread implements Runnable {
        private DistributedLock lock;
        
    
        public LockThread(int threadId,CountDownLatch latch) throws Exception {
            this.lock = new DistributedLock(threadId,latch);
        }
    
        @Override
        public void run() {
            //每一个线程对象启动后都应该创建一个临时的节点信息
            try {
                this.lock.handle();//进行具体的操作处理
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    public class TestDistributedLock {
    
        public static void main(String[] args) throws Exception {
            CountDownLatch latch = new CountDownLatch(10);
            for (int i = 0; i < 10; i++) {
                    new Thread(new LockThread(i,latch)).start();;
            }
            //Thread.sleep(Long.MAX_VALUE);//为了保证可以观察到所有的临时节点信息,保证此处先不进行关闭
            latch.await();
            System.out.println("************* 所有的线程对象操作完毕  *************");
        }
    public class DistributedLock {//建立一个描述分布式锁的程序处理类
        public static final String CONNECTION_RUL = "192.168.12.121:2181,192.168.12.122:2181";
    
        public static final int SESSION_TIMEOUT = 2000;//设置连接超时时间
    
        public static final String AUTH_INFO = "zkuser:mldnjava";//进行连接的授权信息
    
        public static final String GROUPNODE = "/mldn-lock";//根节点
    
        public static final String SUNBODE = GROUPNODE + "/lockthread-";//子节点
    
        private CountDownLatch latch = null;
    
        //本操作的主要目的是为了在取得zookeeper连接之后才能进行后续的处理
        private CountDownLatch connectLatch = new CountDownLatch(1);
    
        private ZooKeeper zkClient = null; //建立Zookeeper程序控制类
    
        private String selfPath; //保存每次创建的临时节点信息
    
        private String waitPath; //保存下一个要进行处理的节点
    
        private int threadId = 0;
    
        /** 进行一些初始化操作使用
         * 
         * @param threadId 随意给定一个编号信息 * @param latch 进行线程同步处理
         * @throws Exception */
        public DistributedLock(int threadId, CountDownLatch latch) throws Exception {
            this.threadId = threadId;//保存每一个线程对象自己的ID信息
            this.latch = latch;
            this.connectionZookeeper();//进行节点的连接
        }
    
        public void handle() throws Exception {//具体业务处理
            this.createSubNode();//创建临时节点操作
        }
    
        public void handleSuccess() throws Exception {//表示取得锁之后进行的处理
            if (this.zkClient.exists(this.selfPath, false) == null) {
                return;//如果当前节点不存在
            }
            this.handleCallback();//执行具体的业务操作
            //如果某一个节点操作完毕了,那么应该立即删除掉该节点,否则获得的最小节点永远都是该节点
            this.zkClient.delete(selfPath, -1);
            this.releaseZookeeper();//释放连接
            this.latch.countDown();//进行减减的操作
        }
    
        public void handleCallback() throws Exception {//取得分布式锁之后的目的是要进行具体的操作
            Thread.sleep(200);//实现一个延迟处理
            System.out
                    .println("****** Thread-" + this.threadId + "获得操作权,进行具体的业务操作");
        }
    
        public boolean checkMinPath() throws Exception {//进行最小节点的判断
            List<String> childen = this.zkClient.getChildren(GROUPNODE, false);//取得所有的节点信息
            Collections.sort(childen); //进行所有节点的排序,这样最小的节点就拍到最上面
            int index = childen
                    .indexOf(this.selfPath.substring(GROUPNODE.length() + 1));
            switch (index) {
                case 0: {
                    return true; //已经确定好当前的节点为最小节点
                }
                case -1: {
                    return false; //该节点可能已经消失了
                }
                default: {//表示该节点不属于最小节点,那么应该向后继续排查
                    this.waitPath = GROUPNODE + "/" + childen.get(index - 1);//获得下一个节点
                    try {
                        this.zkClient.getData(waitPath, true, new Stat());//取得下一个节点的数据
                        return false; //本节点不是当前的操作的最小节点
                    } catch (Exception e) {//如果出现了异常,则表示该节点不存在
                        if (this.zkClient.exists(waitPath, false) == null) {
                            return this.checkMinPath();//继续向后检测
                        } else {
                            throw e;
                        }
                    }
                }
            }
        }
    
        public void createSubNode() throws Exception {//每一个线程对象的启动都要求创建一个节点信息
            this.zkClient.create(SUNBODE, ("Thread-" + this.threadId).getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("【Thread-" + this.threadId + "、创建新的临时节点】"
                    + this.selfPath);
            //当节点创建完成之后就需要进行最小节点的检测
            if (this.checkMinPath()) {//如果当前的节点为整个项目的最小节点
                this.handleSuccess();//进行锁后的具体操作
            }
        }
    
        public void connectionZookeeper() throws Exception {//连接zookeeper服务
            this.zkClient = new ZooKeeper(CONNECTION_RUL, SESSION_TIMEOUT,
                    new Watcher() {
    
                        @Override
                        public void process(WatchedEvent event) {
                            if (event.getType() == EventType.None) {//第一次连接zookeeper的时候会出现none
                                DistributedLock.this.connectLatch.countDown();//表示已经连接成功
                            } else { //要处理删除节点操作,并且要确定下一个节点是已经准备出来的节点信息
                                if (event.getType() == EventType.NodeDeleted
                                        && event.getPath().equals(
                                                DistributedLock.this.waitPath)) {
                                    try {
                                        if (DistributedLock.this.checkMinPath()) {//如果当前的节点为整个项目的最小节点
                                            DistributedLock.this.handleSuccess();//进行锁后的具体操作
                                        }
                                    } catch (Exception e) {
                                            e.printStackTrace();
                                    }
                                }
                            }
                        }
                    });
            this.zkClient.addAuthInfo("digest", AUTH_INFO.getBytes());//进行授权认证
            if (this.zkClient.exists(GROUPNODE, false) == null) {
                this.zkClient.create(GROUPNODE, "LOCKDEMO".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            this.connectLatch.await();//等待连接后才执行后续的功能
        }
    
        public void releaseZookeeper() {//进行zookeeper的连接释放
            if (this.zkClient != null) {
                try {
                    this.zkClient.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
  • 相关阅读:
    CSS Sprite
    使用float和display:block将内联元素转换成块元素的不同点
    [POJ 1185] 炮兵阵地
    [POJ 1947] Rebuilding Roads
    [HDU 1561] The more, The Better
    [HDU 1011] Starship Troopers
    [POJ 1155] TELE
    [HDU 2196] Computer
    [HDU 1520] Anniversary party
    [HDU 5029] Relief grain
  • 原文地址:https://www.cnblogs.com/feiyangbahu/p/9719998.html
Copyright © 2011-2022 走看看