zoukankan      html  css  js  c++  java
  • zookeeper分布式锁的实现

    * 实现思路:
    * 使用Zookeeper最小节点的方式
    * 执行过程:
    * 1、创建根节点,在根节点下创建顺序节点
    * 2、如当前创建的节点为根节点的所有子节点中最小的,则获取锁成功;
    * 否则,找到当前节点的前一个节点,watch前一个节点,当前一个节点被删除时获得锁;另外,等待超时也不能获得锁

    代码能跑通,但还要改,先记一下:

    1.先创建一个抽象类实现Lock接口:
    package com...zookeeper.zkLock;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    /**
     * @author com.
     * @date 
     */
    public abstract class AbstractLock implements Lock {
        /**
         * 获取锁的重试次数
         */
        private static final int RE_TRY = 10;
    
        @Override
        public void lock() {
            int count = RE_TRY;
            while (!this.tryLock() && count > 0) {
                count--;
            }
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            this.lock();
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    2.具体实现代码:
    package com..zookeeper.zkLock;
    
    
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author com.
     * @date 
     */
    
    public class ZookeeperLock extends AbstractLock {
    
        private ZooKeeper zooKeeper;
    
        /**
         * 锁根节点
         */
        private final String lockNamespace;
        /**
         * 锁值节点
         */
        private final String lockKey;
        /**
         * 当前节点
         */
        private String currentNode;
        /**
         * 等待的前一个节点
         */
        private String waitNode;
        /**
         * 竞争的节点列表
         */
        private List<String> lockNodes;
        /**
         * 计数器
         */
        private volatile CountDownLatch countDownLatch;
        /**
         * 是否持有锁
         */
        private volatile boolean locked = false;
    
        public ZookeeperLock(String address, int timeout, String lockNamespace, String lockKey) {
            this.init(address, timeout);
            this.lockNamespace = lockNamespace;
            this.lockKey = lockKey;
        }
    
        private void init(String address, int timeout) {
            try {
                zooKeeper = new ZooKeeper(address, timeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println("事件类型为:" + event.getType());
                        System.out.println("事件发生的路径:" + event.getPath());
                        System.out.println("通知状态为:" +event.getState());
    
                    }
                });
            } catch (Exception e) {
                  System.out.println(e.getMessage()+ e);
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    
        @Override
        public boolean tryLock() {
    
            while (true) {
                String lock = lockNamespace + "/" + lockKey;
                try {
                    // 确保zookeeper连接成功
                    this.ensureZookeeperConnect();
                    // 确保根节点存在
                    ensureNameSpaceExist(lockNamespace);
                    // 创建临时顺序节点,节点目录为/lockNamespace/lockKey_xxx,节点为lockKey_xxx
                    currentNode = zooKeeper.create(lock, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.EPHEMERAL_SEQUENTIAL).replace(lockNamespace + "/", "");
                    // 取出所有子节点
                    List<String> childrenList = zooKeeper.getChildren(lockNamespace, false);
                    // 竞争的节点列表
                    lockNodes = new ArrayList<>();
                    for (String children : childrenList) {
                        if (children.startsWith(lockKey)) {
                            lockNodes.add(children);
                        }
                    }
                    // 排序
                    Collections.sort(lockNodes);
    //                System.out.println("排序后的所有子节点--->:" + lockNodes);
                    // 如当前节点为最小节点,则成功获取锁
                    if (currentNode.equals(lockNodes.get(0))) {
                        locked = true;
                    }
                    System.out.println(Thread.currentThread().getName() + "   "+currentNode +  "  比较  " +  lockNodes.get(0)+   "  为  "+locked + "  创建了临时节点");
                    return locked;
                } catch (InterruptedException | KeeperException e) {
                    System.out.println(Thread.currentThread().getName() + "   获得锁异常");
                    System.out.println(e.getMessage() + e);
                    throw new RuntimeException(e);
                }
            }
        }
    
        @Override
        public void unlock() {
            try {
                zooKeeper.delete(lockNamespace + "/" + currentNode, -1);
                zooKeeper.close();
                locked = false;
                System.out.println(Thread.currentThread().getName() + "   释放锁~~~");
            } catch (InterruptedException | KeeperException e) {
                  System.out.println(e.getMessage()+e);
            }
        }
    
        /**
         * Zookeeper分布式锁
         * 实现思路:
         * 使用Zookeeper最小节点的方式
         * 执行过程:
         * 1、创建根节点,在根节点下创建顺序节点
         * 2、如当前创建的节点为根节点的所有子节点中最小的,则获取锁成功;
         * 否则,找到当前节点的前一个节点,watch前一个节点,当前一个节点被删除时获得锁;另外,等待超时也不能获得锁
         */
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            //等待锁
            try {
                if (tryLock()) {
                    System.out.println(Thread.currentThread().getName() + "   获得锁~~~");
                    return locked;
                }
                System.out.println(Thread.currentThread().getName() + "   等待锁~~~");
                //找到当前节点的前一个节点
                waitNode = lockNodes.get(Collections.binarySearch(lockNodes, currentNode) - 1);
                this.waitLock(time, unit);
                return locked;
            } catch (KeeperException e) {
                  System.out.println(e.getMessage()+ e);
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 等待锁
         */
        private void waitLock(long time, TimeUnit unit) throws KeeperException, InterruptedException {
            String waitLock = lockNamespace + "/" + waitNode;
            System.out.println(Thread.currentThread().getName() +"   等待锁 {} 释放    " + waitLock);
            Stat stat = zooKeeper.exists(waitLock, watchedEvent -> {
                if (countDownLatch != null) {
                    locked = true;
                    countDownLatch.countDown();
                }
            });
            // 前一个节点此刻存在,等待,节点消失则成功获取锁
            if (stat != null) {
                countDownLatch = new CountDownLatch(1);
                countDownLatch.await(time, unit);
                countDownLatch = null;
            } else {
                // 前一个节点此刻不存在,获得锁
                locked = true;
            }
        }
    
        /**
         * 确保根节点存在
         */
        private void ensureNameSpaceExist(String lockNamespace) throws KeeperException, InterruptedException {
            Stat statS = zooKeeper.exists(lockNamespace, false);
            if (statS == null) {
                //如果根节点不存在,创建
                zooKeeper.create(lockNamespace, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
    
        /**
         * 确保zookeeper连接成功,防止出现连接还未完成就执行zookeeper的get/create/exsit操作出现错误KeeperErrorCode = ConnectionLoss
         */
        private void ensureZookeeperConnect() throws InterruptedException {
            CountDownLatch connectedLatch = new CountDownLatch(1);
            zooKeeper.register(watchedEvent -> {
                if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                    connectedLatch.countDown();
                }
            });
            // zookeeper连接中则等待
            if (ZooKeeper.States.CONNECTING == zooKeeper.getState()) {
                connectedLatch.await();
            }
        }
    }

    3.测试类:

    package com..zookeeper.zkLock;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import wfc.service.database.RecordSet;
    import wfc.service.database.SQL;
    
    import java.io.IOException;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    
    public class Main {
    
        private static final int  Thread_num   = 5;
        private static final CyclicBarrier cb = new CyclicBarrier(Thread_num);
    
        public static void main(String[] args) throws Exception {
    
            Thread [] threads = new Thread[Thread_num];
            for(int i=0;i<Thread_num;i++){
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            cb.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            e.printStackTrace();
                        }
    
                        Lock lock = new ZookeeperLock("localhost:2181",5000,"/zkLock","lockKey");
                        try {
                            if(lock.tryLock(1000, TimeUnit.MILLISECONDS)){
                                System.out.println(Thread.currentThread().getName() + "   获得锁执行业务");
                                //减数据库
                                int count = 1;
                                String st_id = "keys";
                                String updateSql = "update dang_fj set count = count-? where st_id = ? and count>=1";
                                Object[] updateObject = new Object[] {count,st_id};
                                RecordSet updateRs  = SQL.execute(updateSql,updateObject);
                                int number = updateRs.TOTAL_RECORD_COUNT;
                                //影响行数
                                System.out.println("数据库影响行数:"+number);
    
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            lock.unlock();
    
                        }
    
                    }
                });
                threads [i] = thread;
                thread.start();
            }
            for(Thread thread : threads){
                thread.join();
            }
            System.out.println("执行结束======");
    
        }                
  • 相关阅读:
    Nightwatch的介绍
    reduce的用法及实例
    什么是声明式渲染?
    H5自带的表单验证
    Flex弹性布局
    JS中的forEach,for in,for of和for的遍历优缺点及区别
    将博客搬至CSDN
    9 外观模式(Facade)
    8 代理模式(Proxy)
    7 装饰模式(Decorator)
  • 原文地址:https://www.cnblogs.com/lifan12589/p/13573009.html
Copyright © 2011-2022 走看看