zoukankan      html  css  js  c++  java
  • 基于ZooKeeper的分布式锁实现

    夜深了,先贴代码保存一下明天再整理

    WatchCallBack.java

    package com.breeze.lock;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.locks.LockSupport;
    
    public class WatchCallBack implements Watcher, AsyncCallback.Create2Callback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
        ZooKeeper zk ;
        String threadName;
        String pathName;
    
        CountDownLatch cc = new CountDownLatch(1);
        //Watcher要实现的方法
        @Override
        public void process(WatchedEvent event) {
            switch (event.getType()) {
                case None:
                    break;
                case NodeCreated:
                    break;
                case NodeDeleted:
                    zk.getChildren("/",false,this,"test");
                    break;
                case NodeDataChanged:
                    break;
                case NodeChildrenChanged:
                    break;
                case DataWatchRemoved:
                    break;
                case ChildWatchRemoved:
                    break;
                case PersistentWatchRemoved:
                    break;
            }
        }
    
        public void tryLock(){
            try {
                zk.create("/lock",threadName.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL,this,"test");
                cc.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        public void unLock(){
            try {
                zk.delete(pathName,-1);
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
    
        }
    
        //Create2Callback的需实现方法
        @Override
        public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
            if(name != null){
                pathName = name;
                System.out.println(threadName + "create path name: "+name);
                //this 是callback 回调下面的processResult
                zk.getChildren("/",false,this,"test");
    
            }
        }
    
        //Children2Callback 的实现方法 zk.getChildren()
    
        /**
         * 每一个node 只watch 它的前一个node ,这样,zk在通知时,只给第一个通知,减少zk服务器压力
         *
         * @param rc
         * @param path
         * @param ctx
         * @param children
         * @param stat
         */
        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
            //先排序,第一个放行,表示可以获得锁
            Collections.sort(children);
            int i = children.indexOf(pathName.substring(1));
            if(i == 0){
                //是第一个
                System.out.println(threadName+"i am the first ...");
                cc.countDown();
            }else {
                //不是
                zk.exists("/"+children.get(i-1),this,this,"test");
            }
    
            //可以看到自己前面的node
    //        System.out.println(threadName+"look locks...");
    //        for (String child : children) {
    //            System.out.println(child);
    //        }
    
    
        }
    
        public ZooKeeper getZk() {
            return zk;
        }
    
        public void setZk(ZooKeeper zk) {
            this.zk = zk;
        }
    
        public String getThreadName() {
            return threadName;
        }
    
        public void setThreadName(String threadName) {
            this.threadName = threadName;
        }
    
        public String getPathName() {
            return pathName;
        }
    
        public void setPathName(String pathName) {
            this.pathName = pathName;
        }
    
    
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            //如果不是第一个children的状态回调
    
        }
    }
    

    TestLock.java

    package com.breeze.lock;
    
    import com.breeze.zk.ZKUtils;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    public class TestLock {
    
        ZooKeeper zk;
    
    
        @Before
        public void conn(){
            zk = ZKUtils.getZK();
        }
    
        @After
        public void close(){
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Test
        public void lock(){
            for (int i = 0; i < 10; i++) {
                new Thread(){
                    public void run(){
                        String threadName = Thread.currentThread().getName();
                        WatchCallBack watchCallBack = new WatchCallBack();
                        watchCallBack.setZk(zk);
                        watchCallBack.setThreadName(threadName);
                        //抢锁
                        watchCallBack.tryLock();
                        //干活
                        System.out.println(threadName+"is working...");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                        //释放锁
                        watchCallBack.unLock();
    
    
                }
                }.start();
            }
    
            while (true);
    
        }
    }
    
    

    ZKUtils.java

    package com.breeze.zk;
    
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZKUtils {
        private static ZooKeeper zk;
    
        private static String address = "zkserver ip1:2181,zk server ip2:2181/testLock";
    
        private static DefaultWatch watch = new DefaultWatch();
    
        private static CountDownLatch cdLatch = new CountDownLatch(1);
        public static ZooKeeper getZK(){
            try {
                zk = new ZooKeeper(address,1000,watch);
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                watch.setCc(cdLatch);
                cdLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return zk;
        }
    }
    
    
    
  • 相关阅读:
    司马光 王安石
    辛弃疾
    伯仲叔季
    三国时代
    西汉 东汉 三国(曹魏 蜀汉 东吴)
    数量关系练习题
    为什么不推荐使用外键约束
    Google Map API申请
    Android传感器——加速度传感器
    第三届空间信息智能服务研讨会
  • 原文地址:https://www.cnblogs.com/shengjm/p/13252857.html
Copyright © 2011-2022 走看看