zoukankan      html  css  js  c++  java
  • Curator使用:(五)分布式锁

    分布式锁介绍##

    分布式执行一些不需要同时执行的复杂任务,curator利用zk的特质,实现了这个选举过程。其实就是利用了多个zk客户端在同一个位置建节点,只会有一个客户端建立成功这个特性。来实现同一时间,只会选择一个客户端执行任务

    代码###

        //分布式锁
        InterProcessMutex lock = new InterProcessMutex(cc,"/lock_path");
        CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                try {
                    down.await();
                    lock.acquire();
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
                SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
                System.out.println(sdf.format(new Date()));
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        down.countDown();
    

    InterProcessMutex 是一个可重入的排他锁,获取锁的过程是通过往ZK下面成功建立节点来实现的,下面是获取锁的过程

        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程的锁数据
            LockData lockData = threadData.get(currentThread);
        //如果不为null,则将锁的数量+1,实现可重入
            if ( lockData != null )
            {
                // re-entering
                lockData.lockCount.incrementAndGet();
                return true;
            }
        //第一次会到这里,尝试在我们之前设置的路径下建立节点
            String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        //建立成功后就初始化lockdata 并按当前线程进行保存,所以可以通过创建多个thread来模拟锁竞争,而不需要建多个client。
            if ( lockPath != null )
            {
                LockData newLockData = new LockData(currentThread, lockPath);
                threadData.put(currentThread, newLockData);
                return true;
            }
    
            return false;
    

    下面是attemptLock的重要代码

        while ( !isDone )
            {
                isDone = true;
    
                try
                {
                    //建立节点
                    ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                    //下面就是获取锁和加锁的循环了
                    hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
                }
                catch ( KeeperException.NoNodeException e )
                {
                    //session过期时走这里,按策略处理,允许重试就重试,否则就抛出异常
                    if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                    {
                        isDone = false;
                    }
                    else
                    {
                        throw e;
                    }
                }
            }
     
    

    下面是internalLockLoop的重要代码

        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
                {
                    //排序是因为要实现公平锁,加上maxleases参数限制取首位
                    List<String>        children = getSortedChildren();
                    //得到子节点名称,比如 _c_ce2a26cb-9721-4f56-91fd-a6d00b00b12c-lock-0000000030
                    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                    
                    //获取是否获得锁的状态,重要方法,maxLeases为1,后面要通过这个参数进行对比,通过判断小于这个来实现 公平锁
                    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    if ( predicateResults.getsTheLock() )
                    {
                        haveTheLock = true;
                    }
                    else
                    {
                        //如果没有获得锁,就给当前节点加个watcher,继续等待,一旦被删掉就调用这个watcher notifyall。
                        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
                        synchronized(this)
                        {
                            try 
                            {
                                // 这个watcher只有一个作用就是唤醒其他线程进行竞争 notifyAll();
                                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                                ...
                            }
                            catch ( KeeperException.NoNodeException e ) 
                            {
                                // it has been deleted (i.e. lock released). Try to acquire again
                            }
                        }
                    }
                }
    
  • 相关阅读:
    51nod 1174 区间最大值(RMQ and 线段树)
    Round #447(Div 2)
    51nod 2006 飞行员匹配
    75.Java异常处理机制throws
    74.Java异常处理机制
    emmm
    数据库关系代数
    汇编实验二 2进制转16进制
    汇编实验一 显示字符串
    JustOj 1386: 众数的数量
  • 原文地址:https://www.cnblogs.com/june777/p/11867381.html
Copyright © 2011-2022 走看看