zoukankan      html  css  js  c++  java
  • JUC锁框架源码阅读-AQS+Zookeeper自定义分布式锁

    介绍

    1.创建一个永久节点

    2.竞争锁的时候同样的的key 所有线程都往永久节点插入指定key name的临时节点(节点不允许重复只会有一个插入成功)

    3.插入失败的开启对永久节点的监听

    4.当时获得锁的线程down机或者删除会触发监听。然后尝试获取CLH第一个线程节点 尝试重新获取锁

    代码已上传github:https://github.com/aa310958153/zookeeper-lock

    注:仅仅用于熟悉AQS如果要用到Zookeeper分布式锁直接使用Curator基于对Zookeeper锁的实现

    InterProcessMutex:分布式可重入排它锁
    InterProcessSemaphoreMutex:分布式排它锁
    InterProcessReadWriteLock:分布式读写锁
    InterProcessMultiLock:将多个锁作为单个实体管理的容器

    加锁

     @Override
            protected boolean tryAcquire(int acquires) {
                //获取当前线程
                final Thread current = Thread.currentThread();
                //获取锁状态
                int c = getState();
                //等于0表示 当前空闲状态可以尝试获取 <1>zkLock加锁
                if (c == 0) {
                    if (zkLock()&&compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //可重入判断
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    <1>zkLock

     /**
             * zookeeper保存临时节点实现加锁
             * @return
             */
            public boolean zkLock() {
                String path= getLockPath();
                boolean haveLock=false;
                try {
                    curatorFramework
                            .create()
                            .creatingParentContainersIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .forPath(path, syncValue.get().getValue().getBytes(StandardCharsets.UTF_8));
    
                    haveLock= true;
                } catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {//重复的标识未获取到锁
                    haveLock=false;
                } catch (Exception e) {
                    e.printStackTrace();
                    haveLock= false;
                }
                /**
                 * 未获取到锁监听永久节点
                 */
                if(!haveLock){
                    TreeCache treeCache = new TreeCache(CuratorClient.getCurator(), SYN_SWITCH_ZK_NODE);
                    try {
                        treeCache.start();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if(!syncValue.get().isAddListener) {
                        treeCache.getListenable().addListener(new TreeCacheListener() {
                            @Override
                            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                                ChildData eventData = event.getData();
                                switch (event.getType()) {
                                    case NODE_ADDED:
                                        //System.out.println(path + "节点添加:" + eventData.getPath() + "	添加数据为:" + new String(eventData.getData()));
                                        break;
                                    case NODE_UPDATED:
                                      //  System.out.println(eventData.getPath() + "节点数据更新	更新数据为:" + new String(eventData.getData()) + "	版本为:" + eventData.getStat().getVersion());
                                        break;
                                    case NODE_REMOVED:
                                        //监听到节点删除。表示锁正常释放 或者持有锁的服务断开连接
                                        //获得第一个阻塞线程 唤醒 尝试获取锁
                                        Thread firstThread=getFirstQueuedThread();
                                        if( firstThread!=null) {
                                            LockSupport.unpark(firstThread);
                                        }
                                        break;
                                    default:
                                        break;
                                }
                            }
                        });
                        syncValue.get().setAddListener(true);
                    }
                }
                return haveLock;
            }

    释放锁

      @Override
            protected boolean tryRelease(int releases) {
                //状态-1 大于0的数字表示可重入加了多少次锁
                int c = getState() - releases;
                //如果加锁线程非当前线程抛出异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                //根据节点存储的值校验是否是当前线程加锁。如果不是抛出异常
                String value;
                try {
                    value= new String(curatorFramework.getData().forPath(getLockPath()));
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new IllegalMonitorStateException();
                }
                if(value==null||!value.equals(syncValue.get().getValue())){
                    throw new IllegalMonitorStateException();
                }
                boolean free = false;
                //当c等于0表示最后一次调用unlock 则进行锁的释放
                if (c == 0) {
                    free = true;
                    //获得锁的线程设置为null
                    setExclusiveOwnerThread(null);
                    String path= getLockPath();
                    try {
                        //删除节点 会触发节点监听
                        curatorFramework.delete().forPath(path);
                    } catch (Exception e) {
                        throw new IllegalMonitorStateException();
                    }
                    syncValue.remove();
                }
                //设置state
                setState(c);
    
                return free;
            }
  • 相关阅读:
    BZOJ-4008: [HNOI2015]亚瑟王 (概率期望DP)
    BZOJ-4832: [Lydsy2017年4月月赛]抵制克苏恩 (概率期望DP)
    BZOJ-1415: [Noi2005]聪聪和可可 (期望DP)
    BZOJ2425 [HAOI2010]计数
    BZOJ2424 [HAOI2010]订货
    BZOJ2423 [HAOI2010]最长公共子序列
    BZOJ2299 [HAOI2011]向量
    BZOJ2298 [HAOI2011]problem a
    BZOJ1040 [ZJOI2008]骑士
    BZOJ一天提交(AC) 51纪念
  • 原文地址:https://www.cnblogs.com/LQBlog/p/15250305.html
Copyright © 2011-2022 走看看