zoukankan      html  css  js  c++  java
  • zookeeper 实现分布式锁

     https://blog.csdn.net/qiangcuo6087/article/details/79067136

    实现互斥锁

    package com.zookeeper.lock;
    
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNoNodeException;
    
    public class BaseDistributedLock {
    
        private final ZkClient client;
        private final String path;
    
        // zookeeper中locker节点的路径
        private final String basePath;
        private final String lockName;
        private static final Integer MAX_RETRY_COUNT = 10;
    
        public BaseDistributedLock(ZkClient client, String path, String lockName) {
    
            this.client = client;
            this.basePath = path;
            this.path = path.concat("/").concat(lockName);
            this.lockName = lockName;
    
        }
    
        private void deleteOurPath(String ourPath) throws Exception {
            client.delete(ourPath);
        }
    
        private String createLockNode(ZkClient client, String path)
                throws Exception {
    
            return client.createEphemeralSequential(path, null);
        }
    
        private boolean waitToLock(long startMillis, Long millisToWait,
                String ourPath) throws Exception {
    
            boolean haveTheLock = false;
            boolean doDelete = false;
    
            try {
    
                while (!haveTheLock) {
                    // 获取lock节点下的所有节点
                    List children = getSortedChildren();
                    String sequenceNodeName = ourPath
                            .substring(basePath.length() + 1);
    
                    // 获取当前节点的在所有节点列表中的位置
                    int ourIndex = children.indexOf(sequenceNodeName);
                    // 节点位置小于0,说明没有找到节点
                    if (ourIndex < 0) {
                        throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                    }
    
                    // 节点位置大于0说明还有其他节点在当前的节点前面,就需要等待其他的节点都释放
                    boolean isGetTheLock = ourIndex == 0;
                    String pathToWatch = (String) (isGetTheLock ? null : children
                            .get(ourIndex - 1));
    
                    if (isGetTheLock) {
    
                        haveTheLock = true;
    
                    } else {
    
                        String previousSequencePath = basePath.concat("/").concat(
                                pathToWatch);
                        final CountDownLatch latch = new CountDownLatch(1);
                        final IZkDataListener previousListener = new IZkDataListener() {
    
                            public void handleDataDeleted(String dataPath)
                                    throws Exception {
                                latch.countDown();
                            }
    
                            public void handleDataChange(String dataPath,
                                    Object data) throws Exception {
                                // ignore
                            }
                        };
    
                        try {
                            // 如果节点不存在会出现异常
                            client.subscribeDataChanges(previousSequencePath,
                                    previousListener);
    
                            if (millisToWait != null) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait <= 0) {
                                    doDelete = true; // timed out - delete our node
                                    break;
                                }
    
                                latch.await(millisToWait, TimeUnit.MICROSECONDS);
                            } else {
                                latch.await();
                            }
                        } catch (ZkNoNodeException e) {
                            // ignore
                        } finally {
                            client.unsubscribeDataChanges(previousSequencePath,
                                    previousListener);
                        }
    
                    }
                }
            } catch (Exception e) {
                // 发生异常需要删除节点
                doDelete = true;
                throw e;
            } finally {
                // 如果需要删除节点
                if (doDelete) {
                    deleteOurPath(ourPath);
                }
            }
            return haveTheLock;
        }
    
        private String getLockNodeNumber(String str, String lockName) {
            int index = str.lastIndexOf(lockName);
            if (index >= 0) {
                index += lockName.length();
                return index <= str.length() ? str.substring(index) : "";
            }
            return str;
        }
    
        List<String> getSortedChildren() throws Exception {
            try {
    
                List<String> children = client.getChildren(basePath);
                Collections.sort(children, new Comparator<String>() {
                    public int compare(String lhs, String rhs) {
                        return getLockNodeNumber(lhs, lockName).compareTo(
                                getLockNodeNumber(rhs, lockName));
                    }
                });
                return children;
    
            } catch (ZkNoNodeException e) {
    
                client.createPersistent(basePath, true);
                return getSortedChildren();
    
            }
        }
    
        protected void releaseLock(String lockPath) throws Exception {
            deleteOurPath(lockPath);
    
        }
    
        protected String attemptLock(long time, TimeUnit unit) throws Exception {
    
            final long startMillis = System.currentTimeMillis();
            final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
    
            String ourPath = null;
            boolean hasTheLock = false;
            boolean isDone = false;
            int retryCount = 0;
    
            // 网络闪断需要重试一试
            while (!isDone) {
                isDone = true;
    
                try {
                    ourPath = createLockNode(client, path);
                    hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
                } catch (ZkNoNodeException e) {
                    if (retryCount++ < MAX_RETRY_COUNT) {
                        isDone = false;
                    } else {
                        throw e;
                    }
                }
            }
            if (hasTheLock) {
                return ourPath;
            }
    
            return null;
        }
    
    }
    View Code

    接口类

    package com.zookeeper.lock;
    
    import java.util.concurrent.TimeUnit;
    
    public interface DistributedLock {
    
        /** 获取锁,如果没有得到就等待 */
        public void acquire() throws Exception;
    
        /**
         * 获取锁,直到超时
         * 
         * @param time
         *            超时时间
         * @param unit
         *            参数的单位
         * @return 是否获取到锁
         * @throws Exception
         */
        public boolean acquire(long time, TimeUnit unit) throws Exception;
    
        /**
         * 释放锁
         * 
         * @throws Exception
         */
        public void release() throws Exception;
    
    }
    View Code

    测试类

    package com.zookeeper.lock;
    
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
    
    import coom.zookeeperdemo.lock.SimpleDistributedLockMutex;
    
    public class TestDistributedLock {
    
        public static void main(String[] args) {
    
            final ZkClient zkClientExt1 = new ZkClient("192.168.1.105:2181",
                    5000, 5000, new BytesPushThroughSerializer());
            final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex(
                    zkClientExt1, "/Mutex");
    
            final ZkClient zkClientExt2 = new ZkClient("192.168.1.105:2181",
                    5000, 5000, new BytesPushThroughSerializer());
            final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex(
                    zkClientExt2, "/Mutex");
    
            try {
                mutex1.acquire();
                System.out.println("Client1 locked");
                Thread client2Thd = new Thread(new Runnable() {
    
                    public void run() {
                        try {
                            mutex2.acquire();
                            System.out.println("Client2 locked");
                            mutex2.release();
                            System.out.println("Client2 released lock");
    
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                client2Thd.start();
                Thread.sleep(5000);
                mutex1.release();
                System.out.println("Client1 released lock");
    
                client2Thd.join();
    
            } catch (Exception e) {
    
                e.printStackTrace();
            }
    
        }
    
    }
    View Code

    原文不知道地址了

  • 相关阅读:
    为什么电影里的黑客都不屑用鼠标? (转)
    专注做好一件事(转) focus---这个世界上最成功的人,他们在某一领域获得成功之后,可通过经营杠杆进入任何他们想要涉足的领域。而这都得依赖于他们曾极致的专注在做好一件事情上。
    世间万物都是遵循某种类似的规律,谁先把握了这些规律,谁就最早成为了强者。
    走的时候不要太急,有时间要停下来想一想当初为什么而走,这样,才会走的更稳,走的更明白。
    Android笔记: Android版本号
    Android笔记:真机调试无法输出Log 信息的问题
    阿里云服务器试用
    Android笔记:利用InputStream和BufferedReader 进行字节流 字符流处理
    Android笔记:java 中的数组
    Android笔记:C memory copy
  • 原文地址:https://www.cnblogs.com/newlangwen/p/10143879.html
Copyright © 2011-2022 走看看