zoukankan      html  css  js  c++  java
  • 基于zookeeper分布式全局序列分布式锁详解

    InterProcessMutex 类详解步骤:获取锁的过程步骤:

    1.acquire方法,根据当前线程获取锁对象,判断当前的线程是否已经获取锁,此处则代表可重入;
    2.获取锁方法,String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    3.当获取到锁时,则把锁数据放入内存对象
    private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();

    获取锁的过程步骤:

    1.每个线程进入获取锁方法之后,直接调用zookeeper创建对应的节点数据;你下为/rrrrww节点下对应的线程所创建的节点

    [zk: localhost:2181(CONNECTED) 4] ls /rrrrw
    [_c_73f6f5a1-3172-435a-8cb7-cb685edd5850-lock-0000014332, _c_6f6e059a-44c2-4452-a6fe-a8fd46e86b2f-lock-0000014339, _c_1ba53a24-7700-4f8c-a91f-8db5c036c50c-lock-0000014338, _c_2123b109-e7d2-4d04-9a6c-2df0b627be15-lock-0000014335, _c_33672d76-265e-4998-9bb3-16218d9eca21-lock-0000014328, _c_f7ffbf6b-e0dd-4ef9-925f-233912cdff74-lock-0000014337, _c_d60409a6-190e-4aa9-a40b-b354bfacc29a-lock-0000014336, _c_89994cb9-9c19-4fa3-b07f-2af48d823d08-lock-0000014334]

    2.获取节点下所有已经创建的子节点,来处理第一个进入的节点,第一个进入的节点,则会获取到锁;

    3.如果没有获取到锁,则watcher当前的节点变化,让当前线程等待,当节点变化时,则notifyall所有的线程,则继续获取锁;

    3.增加序列之后,则release锁;

    以下代码根据zookeeper实现了分布式锁获取全局序列

    package com.freeplatform.common.core.seq;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.curator.utils.CloseableUtils;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * 
     * <p>Description: </p>
     * @date 2016年5月12日
     * @author 李世佳 
     */
    public class DistributedLockSeq {
    
        public static final String LOCK_ZNODE = "/rrrrw";
    
        public static CuratorFramework client;
    
        public static CuratorFrameworkFactory.Builder builder;
    
        static {
            client = CuratorFrameworkFactory.newClient("172.16.35.9:2181", new ExponentialBackoffRetry(1000, 3));
    
            builder = CuratorFrameworkFactory.builder().connectString("172.16.35.9:2181")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3));
            // etc. etc.
        }
    
        public static void main(String[] args) {
    
            final ExecutorService service = Executors.newFixedThreadPool(20);
    
            for (int i = 0; i < 1; i++) {
                service.execute(new SeqTask("[Concurrent-" + i + "]"));
            }
    
            if (!service.isShutdown()) {
                try {
                    service.shutdown();
                    if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
                        service.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    service.shutdownNow();
                    System.out.println(e.getMessage());
                }
            }
        }
    
        // 借助curatorFramework利用Zookeeper实现分布式seq生成
        public static class SeqTask implements Runnable {
    
            private final String seqTaskName;
    
            public SeqTask(String seqTaskName) {
                this.seqTaskName = seqTaskName;
            }
    
            @Override
            public void run() {
                CuratorFramework client = builder.build();
                client.start();
                // 锁对象 client 锁节点
                InterProcessMutex lock = new InterProcessMutex(client, LOCK_ZNODE);
                try {
                    boolean retry = true;
                    int i = 0;
                    do {
                        i++;
                        System.out.println(seqTaskName + " recome:" + i);
                        // 索取锁,设置时长1s,如果获取不到,则继续获取
                        if (lock.acquire(1000, TimeUnit.MILLISECONDS)) {
                            Stat stat = client.checkExists().forPath(LOCK_ZNODE);
                            if (stat != null) {
                                // 获取锁操作则增加序列
                                byte[] oldData = client.getData().storingStatIn(stat).forPath(LOCK_ZNODE);
                                String s = new String(oldData);
                                int d = Integer.parseInt(s);
                                d = d + 1;
                                s = String.valueOf(d);
                                byte[] newData = s.getBytes();
                                client.setData().forPath(LOCK_ZNODE, newData);
                                System.out.println(seqTaskName + " obtain seq :" + new String(newData));
                            }
                            retry = false;
                        }
                    } while (retry);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        if (lock.isAcquiredInThisProcess()) {
                            lock.release();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        CloseableUtils.closeQuietly(client);
                    }
                }
            }
        }
    }
    /**
         * Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread
         * can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call
         * to {@link #release()}
         *
         * @param time time to wait
         * @param unit time unit
         * @return true if the mutex was acquired, false if not
         * @throws Exception ZK errors, connection interruptions
         */
        @Override
        public boolean acquire(long time, TimeUnit unit) throws Exception
        {
            return internalLock(time, unit);
        }
    
      private boolean internalLock(long time, TimeUnit unit) throws Exception
        {
            /*
               Note on concurrency: a given lockData instance
               can be only acted on by a single thread so locking isn't necessary
            */
    
            Thread          currentThread = Thread.currentThread();
    
            LockData        lockData = threadData.get(currentThread);
            if ( lockData != null )
            {
                // re-entering
                lockData.lockCount.incrementAndGet();
                return true;
            }
    
            String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
            if ( lockPath != null )
            {
                LockData        newLockData = new LockData(currentThread, lockPath);
                threadData.put(currentThread, newLockData);
                return true;
            }
    
            return false;
        }
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
        {
            final long      startMillis = System.currentTimeMillis();
            final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
            final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
            int             retryCount = 0;
    
            String          ourPath = null;
            boolean         hasTheLock = false;
            boolean         isDone = false;
            while ( !isDone )
            {
                isDone = true;
    
                try
                {
                    if ( localLockNodeBytes != null )
                    {
                        ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
                    }
                    else
                    {
                        ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
                    }
                    hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
                }
                catch ( KeeperException.NoNodeException e )
                {
                    // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                    // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                    if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                    {
                        isDone = false;
                    }
                    else
                    {
                        throw e;
                    }
                }
            }
    
            if ( hasTheLock )
            {
                return ourPath;
            }
    
            return null;
        }
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
        {
            boolean     haveTheLock = false;
            boolean     doDelete = false;
            try
            {
                if ( revocable.get() != null )
                {
                    client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
                }
    
                while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
                {
                    //获取父节点下所有线程的子节点
                    List<String>        children = getSortedChildren();
                    
                    //获取当前线程的节点名称
                    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                     //计算当前节点是否获取到锁
                    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    if ( predicateResults.getsTheLock() )
                    {
                        haveTheLock = true;
                    }
                    else
                    {
                         //没有索取到锁,则让线程等待,并且watcher当前节点,当节点有变化的之后,则notifyAll当前等待的线程,让它再次进入来争抢锁
                        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
                        synchronized(this)
                        {
                            try 
                            {
                                // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                                if ( millisToWait != null )
                                {
                                    millisToWait -= (System.currentTimeMillis() - startMillis);
                                    startMillis = System.currentTimeMillis();
                                    if ( millisToWait <= 0 )
                                    {
                                        doDelete = true;    // timed out - delete our node
                                        break;
                                    }
    
                                    wait(millisToWait);
                                }
                                else
                                {
                                    wait();
                                }
                            }
                            catch ( KeeperException.NoNodeException e ) 
                            {
                                // it has been deleted (i.e. lock released). Try to acquire again
                            }
                        }
                    }
                }
            }
            catch ( Exception e )
            {
                doDelete = true;
                throw e;
            }
            finally
            {
                if ( doDelete )
                {
                    deleteOurPath(ourPath);
                }
            }
            return haveTheLock;
        }    
    @Override
        public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
        {
            //maxLeases=1,即表示第一个进入的线程所创建的节点获取锁,其他则无,线程等待,watcher节点,节点变化,继续抢占锁
            int             ourIndex = children.indexOf(sequenceNodeName);
            validateOurIndex(sequenceNodeName, ourIndex);
    
            boolean         getsTheLock = ourIndex < maxLeases;
            String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
    
            return new PredicateResults(pathToWatch, getsTheLock);
        }
    private final Watcher watcher = new Watcher()
        {
            @Override
            public void process(WatchedEvent event)
            {
                notifyFromWatcher();
            }
        };
    
    private synchronized void notifyFromWatcher()
        {
            notifyAll();
        }




  • 相关阅读:
    hdfs java.io.IOException: Mkdirs failed to create
    Linux文件权限授予
    插入排序
    Oracle中怎样设置表中的主键id递增
    单链表中是否有环之java实现
    excel 单元格的锁定 以及 JXL的实现方式
    用POI的HSSF来控制EXCEL的研究
    Oracle Job 语法和时间间隔的设定(转)
    您的JAVA代码安全吗?(转)
    Jdom 解析 XML【转】
  • 原文地址:https://www.cnblogs.com/lishijia/p/5486494.html
Copyright © 2011-2022 走看看