zoukankan      html  css  js  c++  java
  • 【转载】zookeeper 分布式锁 实现

    基于zookeeper的分布式lock实现           

    博客分类:
     

    背景

     继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。

     这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制

    算法

    可以预先看一下zookeeper的官方文档: 

    lock操作过程:
    • 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
    • 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
    • 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
    • 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
    • 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
    unlock操作过程:
    • 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出
     
    其中的几个关键点:
    1. node节点选择为EPHEMERAL_SEQUENTIAL很重要。 * 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。 * 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
    2. 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
     
    注意:
    • 使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
    • 同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
    没有两全其美的做法,两者取其一,选择自己一个能接受的即可

    代码

    Java代码 复制代码 收藏代码
    1. public class DistributedLock {  
    2.   
    3.     private static final byte[]  data      = { 0x12, 0x34 };  
    4.     private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();  
    5.     private final String         root;                                     //根节点路径  
    6.     private String               id;  
    7.     private LockNode             idName;  
    8.     private String               ownerId;  
    9.     private String               lastChildId;  
    10.     private Throwable            other     = null;  
    11.     private KeeperException      exception = null;  
    12.     private InterruptedException interrupt = null;  
    13.   
    14.     public DistributedLock(String root) {  
    15.         this.root = root;  
    16.         ensureExists(root);  
    17.     }  
    18.   
    19.     /** 
    20.      * 尝试获取锁操作,阻塞式可被中断 
    21.      */  
    22.     public void lock() throws InterruptedException, KeeperException {  
    23.         // 可能初始化的时候就失败了  
    24.         if (exception != null) {  
    25.             throw exception;  
    26.         }  
    27.   
    28.         if (interrupt != null) {  
    29.             throw interrupt;  
    30.         }  
    31.   
    32.         if (other != null) {  
    33.             throw new NestableRuntimeException(other);  
    34.         }  
    35.   
    36.         if (isOwner()) {//锁重入  
    37.             return;  
    38.         }  
    39.   
    40.         BooleanMutex mutex = new BooleanMutex();  
    41.         acquireLock(mutex);  
    42.         // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试  
    43.         try {  
    44.             mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true  
    45.             // mutex.get();  
    46.         } catch (TimeoutException e) {  
    47.             if (!mutex.state()) {  
    48.                 lock();  
    49.             }  
    50.         }  
    51.   
    52.         if (exception != null) {  
    53.             throw exception;  
    54.         }  
    55.   
    56.         if (interrupt != null) {  
    57.             throw interrupt;  
    58.         }  
    59.   
    60.         if (other != null) {  
    61.             throw new NestableRuntimeException(other);  
    62.         }  
    63.     }  
    64.   
    65.     /** 
    66.      * 尝试获取锁对象, 不会阻塞 
    67.      *  
    68.      * @throws InterruptedException 
    69.      * @throws KeeperException 
    70.      */  
    71.     public boolean tryLock() throws KeeperException {  
    72.         // 可能初始化的时候就失败了  
    73.         if (exception != null) {  
    74.             throw exception;  
    75.         }  
    76.   
    77.         if (isOwner()) {//锁重入  
    78.             return true;  
    79.         }  
    80.   
    81.         acquireLock(null);  
    82.   
    83.         if (exception != null) {  
    84.             throw exception;  
    85.         }  
    86.   
    87.         if (interrupt != null) {  
    88.             Thread.currentThread().interrupt();  
    89.         }  
    90.   
    91.         if (other != null) {  
    92.             throw new NestableRuntimeException(other);  
    93.         }  
    94.   
    95.         return isOwner();  
    96.     }  
    97.   
    98.     /** 
    99.      * 释放锁对象 
    100.      */  
    101.     public void unlock() throws KeeperException {  
    102.         if (id != null) {  
    103.             try {  
    104.                 zookeeper.delete(root + "/" + id, -1);  
    105.             } catch (InterruptedException e) {  
    106.                 Thread.currentThread().interrupt();  
    107.             } catch (KeeperException.NoNodeException e) {  
    108.                 // do nothing  
    109.             } finally {  
    110.                 id = null;  
    111.             }  
    112.         } else {  
    113.             //do nothing  
    114.         }  
    115.     }  
    116.   
    117.     private void ensureExists(final String path) {  
    118.         try {  
    119.             Stat stat = zookeeper.exists(path, false);  
    120.             if (stat != null) {  
    121.                 return;  
    122.             }  
    123.   
    124.             zookeeper.create(path, data, CreateMode.PERSISTENT);  
    125.         } catch (KeeperException e) {  
    126.             exception = e;  
    127.         } catch (InterruptedException e) {  
    128.             Thread.currentThread().interrupt();  
    129.             interrupt = e;  
    130.         }  
    131.     }  
    132.   
    133.     /** 
    134.      * 返回锁对象对应的path 
    135.      */  
    136.     public String getRoot() {  
    137.         return root;  
    138.     }  
    139.   
    140.     /** 
    141.      * 判断当前是不是锁的owner 
    142.      */  
    143.     public boolean isOwner() {  
    144.         return id != null && ownerId != null && id.equals(ownerId);  
    145.     }  
    146.   
    147.     /** 
    148.      * 返回当前的节点id 
    149.      */  
    150.     public String getId() {  
    151.         return this.id;  
    152.     }  
    153.   
    154.     // ===================== helper method =============================  
    155.   
    156.     /** 
    157.      * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作 
    158.      */  
    159.     private Boolean acquireLock(final BooleanMutex mutex) {  
    160.         try {  
    161.             do {  
    162.                 if (id == null) {//构建当前lock的唯一标识  
    163.                     long sessionId = zookeeper.getDelegate().getSessionId();  
    164.                     String prefix = "x-" + sessionId + "-";  
    165.                     //如果第一次,则创建一个节点  
    166.                     String path = zookeeper.create(root + "/" + prefix, data,  
    167.                             CreateMode.EPHEMERAL_SEQUENTIAL);  
    168.                     int index = path.lastIndexOf("/");  
    169.                     id = StringUtils.substring(path, index + 1);  
    170.                     idName = new LockNode(id);  
    171.                 }  
    172.   
    173.                 if (id != null) {  
    174.                     List<String> names = zookeeper.getChildren(root, false);  
    175.                     if (names.isEmpty()) {  
    176.                         id = null;//异常情况,重新创建一个  
    177.                     } else {  
    178.                         //对节点进行排序  
    179.                         SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();  
    180.                         for (String name : names) {  
    181.                             sortedNames.add(new LockNode(name));  
    182.                         }  
    183.   
    184.                         if (sortedNames.contains(idName) == false) {  
    185.                             id = null;//清空为null,重新创建一个  
    186.                             continue;  
    187.                         }  
    188.   
    189.                         //将第一个节点做为ownerId  
    190.                         ownerId = sortedNames.first().getName();  
    191.                         if (mutex != null && isOwner()) {  
    192.                             mutex.set(true);//直接更新状态,返回  
    193.                             return true;  
    194.                         } else if (mutex == null) {  
    195.                             return isOwner();  
    196.                         }  
    197.   
    198.                         SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);  
    199.                         if (!lessThanMe.isEmpty()) {  
    200.                             //关注一下排队在自己之前的最近的一个节点  
    201.                             LockNode lastChildName = lessThanMe.last();  
    202.                             lastChildId = lastChildName.getName();  
    203.                             //异步watcher处理  
    204.                             zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {  
    205.   
    206.                                 public void asyncProcess(WatchedEvent event) {  
    207.                                     acquireLock(mutex);  
    208.                                 }  
    209.   
    210.                             });  
    211.   
    212.                             if (stat == null) {  
    213.                                 acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去  
    214.                             }  
    215.                         } else {  
    216.                             if (isOwner()) {  
    217.                                 mutex.set(true);  
    218.                             } else {  
    219.                                 id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同  
    220.                             }  
    221.                         }  
    222.                     }  
    223.                 }  
    224.             } while (id == null);  
    225.         } catch (KeeperException e) {  
    226.             exception = e;  
    227.             if (mutex != null) {  
    228.                 mutex.set(true);  
    229.             }  
    230.         } catch (InterruptedException e) {  
    231.             interrupt = e;  
    232.             if (mutex != null) {  
    233.                 mutex.set(true);  
    234.             }  
    235.         } catch (Throwable e) {  
    236.             other = e;  
    237.             if (mutex != null) {  
    238.                 mutex.set(true);  
    239.             }  
    240.         }  
    241.   
    242.         if (isOwner() && mutex != null) {  
    243.             mutex.set(true);  
    244.         }  
    245.         return Boolean.FALSE;  
    246.     }  
    247. }  
    public class DistributedLock {
    
        private static final byte[]  data      = { 0x12, 0x34 };
        private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();
        private final String         root;                                     //根节点路径
        private String               id;
        private LockNode             idName;
        private String               ownerId;
        private String               lastChildId;
        private Throwable            other     = null;
        private KeeperException      exception = null;
        private InterruptedException interrupt = null;
    
        public DistributedLock(String root) {
            this.root = root;
            ensureExists(root);
        }
    
        /**
         * 尝试获取锁操作,阻塞式可被中断
         */
        public void lock() throws InterruptedException, KeeperException {
            // 可能初始化的时候就失败了
            if (exception != null) {
                throw exception;
            }
    
            if (interrupt != null) {
                throw interrupt;
            }
    
            if (other != null) {
                throw new NestableRuntimeException(other);
            }
    
            if (isOwner()) {//锁重入
                return;
            }
    
            BooleanMutex mutex = new BooleanMutex();
            acquireLock(mutex);
            // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
            try {
                mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
                // mutex.get();
            } catch (TimeoutException e) {
                if (!mutex.state()) {
                    lock();
                }
            }
    
            if (exception != null) {
                throw exception;
            }
    
            if (interrupt != null) {
                throw interrupt;
            }
    
            if (other != null) {
                throw new NestableRuntimeException(other);
            }
        }
    
        /**
         * 尝试获取锁对象, 不会阻塞
         * 
         * @throws InterruptedException
         * @throws KeeperException
         */
        public boolean tryLock() throws KeeperException {
            // 可能初始化的时候就失败了
            if (exception != null) {
                throw exception;
            }
    
            if (isOwner()) {//锁重入
                return true;
            }
    
            acquireLock(null);
    
            if (exception != null) {
                throw exception;
            }
    
            if (interrupt != null) {
                Thread.currentThread().interrupt();
            }
    
            if (other != null) {
                throw new NestableRuntimeException(other);
            }
    
            return isOwner();
        }
    
        /**
         * 释放锁对象
         */
        public void unlock() throws KeeperException {
            if (id != null) {
                try {
                    zookeeper.delete(root + "/" + id, -1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (KeeperException.NoNodeException e) {
                    // do nothing
                } finally {
                    id = null;
                }
            } else {
                //do nothing
            }
        }
    
        private void ensureExists(final String path) {
            try {
                Stat stat = zookeeper.exists(path, false);
                if (stat != null) {
                    return;
                }
    
                zookeeper.create(path, data, CreateMode.PERSISTENT);
            } catch (KeeperException e) {
                exception = e;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                interrupt = e;
            }
        }
    
        /**
         * 返回锁对象对应的path
         */
        public String getRoot() {
            return root;
        }
    
        /**
         * 判断当前是不是锁的owner
         */
        public boolean isOwner() {
            return id != null && ownerId != null && id.equals(ownerId);
        }
    
        /**
         * 返回当前的节点id
         */
        public String getId() {
            return this.id;
        }
    
        // ===================== helper method =============================
    
        /**
         * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
         */
        private Boolean acquireLock(final BooleanMutex mutex) {
            try {
                do {
                    if (id == null) {//构建当前lock的唯一标识
                        long sessionId = zookeeper.getDelegate().getSessionId();
                        String prefix = "x-" + sessionId + "-";
                        //如果第一次,则创建一个节点
                        String path = zookeeper.create(root + "/" + prefix, data,
                                CreateMode.EPHEMERAL_SEQUENTIAL);
                        int index = path.lastIndexOf("/");
                        id = StringUtils.substring(path, index + 1);
                        idName = new LockNode(id);
                    }
    
                    if (id != null) {
                        List<String> names = zookeeper.getChildren(root, false);
                        if (names.isEmpty()) {
                            id = null;//异常情况,重新创建一个
                        } else {
                            //对节点进行排序
                            SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
                            for (String name : names) {
                                sortedNames.add(new LockNode(name));
                            }
    
                            if (sortedNames.contains(idName) == false) {
                                id = null;//清空为null,重新创建一个
                                continue;
                            }
    
                            //将第一个节点做为ownerId
                            ownerId = sortedNames.first().getName();
                            if (mutex != null && isOwner()) {
                                mutex.set(true);//直接更新状态,返回
                                return true;
                            } else if (mutex == null) {
                                return isOwner();
                            }
    
                            SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
                            if (!lessThanMe.isEmpty()) {
                                //关注一下排队在自己之前的最近的一个节点
                                LockNode lastChildName = lessThanMe.last();
                                lastChildId = lastChildName.getName();
                                //异步watcher处理
                                zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {
    
                                    public void asyncProcess(WatchedEvent event) {
                                        acquireLock(mutex);
                                    }
    
                                });
    
                                if (stat == null) {
                                    acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
                                }
                            } else {
                                if (isOwner()) {
                                    mutex.set(true);
                                } else {
                                    id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
                                }
                            }
                        }
                    }
                } while (id == null);
            } catch (KeeperException e) {
                exception = e;
                if (mutex != null) {
                    mutex.set(true);
                }
            } catch (InterruptedException e) {
                interrupt = e;
                if (mutex != null) {
                    mutex.set(true);
                }
            } catch (Throwable e) {
                other = e;
                if (mutex != null) {
                    mutex.set(true);
                }
            }
    
            if (isOwner() && mutex != null) {
                mutex.set(true);
            }
            return Boolean.FALSE;
        }
    }

    相关说明:

    测试代码:

    Java代码 复制代码 收藏代码
    1. @Test  
    2.     public void test_lock() {  
    3.         ExecutorService exeucotr = Executors.newCachedThreadPool();  
    4.         final int count = 50;  
    5.         final CountDownLatch latch = new CountDownLatch(count);  
    6.         final DistributedLock[] nodes = new DistributedLock[count];  
    7.         for (int i = 0; i < count; i++) {  
    8.             final DistributedLock node = new DistributedLock(dir);  
    9.             nodes[i] = node;  
    10.             exeucotr.submit(new Runnable() {  
    11.   
    12.                 public void run() {  
    13.                     try {  
    14.                         Thread.sleep(1000);  
    15.                         node.lock(); //获取锁  
    16.                         Thread.sleep(100 + RandomUtils.nextInt(100));  
    17.   
    18.                         System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());  
    19.                     } catch (InterruptedException e) {  
    20.                         want.fail();  
    21.                     } catch (KeeperException e) {  
    22.                         want.fail();  
    23.                     } finally {  
    24.                         latch.countDown();  
    25.                         try {  
    26.                             node.unlock();  
    27.                         } catch (KeeperException e) {  
    28.                             want.fail();  
    29.                         }  
    30.                     }  
    31.   
    32.                 }  
    33.             });  
    34.         }  
    35.   
    36.         try {  
    37.             latch.await();  
    38.         } catch (InterruptedException e) {  
    39.             want.fail();  
    40.         }  
    41.   
    42.         exeucotr.shutdown();  
    43.     }  
    @Test
        public void test_lock() {
            ExecutorService exeucotr = Executors.newCachedThreadPool();
            final int count = 50;
            final CountDownLatch latch = new CountDownLatch(count);
            final DistributedLock[] nodes = new DistributedLock[count];
            for (int i = 0; i < count; i++) {
                final DistributedLock node = new DistributedLock(dir);
                nodes[i] = node;
                exeucotr.submit(new Runnable() {
    
                    public void run() {
                        try {
                            Thread.sleep(1000);
                            node.lock(); //获取锁
                            Thread.sleep(100 + RandomUtils.nextInt(100));
    
                            System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
                        } catch (InterruptedException e) {
                            want.fail();
                        } catch (KeeperException e) {
                            want.fail();
                        } finally {
                            latch.countDown();
                            try {
                                node.unlock();
                            } catch (KeeperException e) {
                                want.fail();
                            }
                        }
    
                    }
                });
            }
    
            try {
                latch.await();
            } catch (InterruptedException e) {
                want.fail();
            }
    
            exeucotr.shutdown();
        }

    升级版

     实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。

    大致原理就是ReentrantLock 和 DistributedLock的一个结合。

    •  单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
    • 拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
    锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。
     
    代码:
    Java代码 复制代码 收藏代码
    1. public class DistributedReentrantLock extends DistributedLock {  
    2.   
    3.     private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";  
    4.     private ReentrantLock       reentrantLock = new ReentrantLock();  
    5.   
    6.     public DistributedReentrantLock(String root) {  
    7.         super(root);  
    8.     }  
    9.   
    10.     public void lock() throws InterruptedException, KeeperException {  
    11.         reentrantLock.lock();//多线程竞争时,先拿到第一层锁  
    12.         super.lock();  
    13.     }  
    14.   
    15.     public boolean tryLock() throws KeeperException {  
    16.         //多线程竞争时,先拿到第一层锁  
    17.         return reentrantLock.tryLock() && super.tryLock();  
    18.     }  
    19.   
    20.     public void unlock() throws KeeperException {  
    21.         super.unlock();  
    22.         reentrantLock.unlock();//多线程竞争时,释放最外层锁  
    23.     }  
    24.   
    25.     @Override  
    26.     public String getId() {  
    27.         return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());  
    28.     }  
    29.   
    30.     @Override  
    31.     public boolean isOwner() {  
    32.         return reentrantLock.isHeldByCurrentThread() && super.isOwner();  
    33.     }  
    34.   
    35. }  
    public class DistributedReentrantLock extends DistributedLock {
    
        private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";
        private ReentrantLock       reentrantLock = new ReentrantLock();
    
        public DistributedReentrantLock(String root) {
            super(root);
        }
    
        public void lock() throws InterruptedException, KeeperException {
            reentrantLock.lock();//多线程竞争时,先拿到第一层锁
            super.lock();
        }
    
        public boolean tryLock() throws KeeperException {
            //多线程竞争时,先拿到第一层锁
            return reentrantLock.tryLock() && super.tryLock();
        }
    
        public void unlock() throws KeeperException {
            super.unlock();
            reentrantLock.unlock();//多线程竞争时,释放最外层锁
        }
    
        @Override
        public String getId() {
            return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
        }
    
        @Override
        public boolean isOwner() {
            return reentrantLock.isHeldByCurrentThread() && super.isOwner();
        }
    
    }
     
    测试代码:
    Java代码 复制代码 收藏代码
    1. @Test  
    2.     public void test_lock() {  
    3.         ExecutorService exeucotr = Executors.newCachedThreadPool();  
    4.         final int count = 50;  
    5.         final CountDownLatch latch = new CountDownLatch(count);  
    6.   
    7.         final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁  
    8.         for (int i = 0; i < count; i++) {  
    9.             exeucotr.submit(new Runnable() {  
    10.   
    11.                 public void run() {  
    12.                     try {  
    13.                         Thread.sleep(1000);  
    14.                         lock.lock();  
    15.                         Thread.sleep(100 + RandomUtils.nextInt(100));  
    16.   
    17.                         System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());  
    18.                     } catch (InterruptedException e) {  
    19.                         want.fail();  
    20.                     } catch (KeeperException e) {  
    21.                         want.fail();  
    22.                     } finally {  
    23.                         latch.countDown();  
    24.                         try {  
    25.                             lock.unlock();  
    26.                         } catch (KeeperException e) {  
    27.                             want.fail();  
    28.                         }  
    29.                     }  
    30.   
    31.                 }  
    32.             });  
    33.         }  
    34.   
    35.         try {  
    36.             latch.await();  
    37.         } catch (InterruptedException e) {  
    38.             want.fail();  
    39.         }  
    40.   
    41.         exeucotr.shutdown();  
    42.     }  
    @Test
        public void test_lock() {
            ExecutorService exeucotr = Executors.newCachedThreadPool();
            final int count = 50;
            final CountDownLatch latch = new CountDownLatch(count);
    
            final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
            for (int i = 0; i < count; i++) {
                exeucotr.submit(new Runnable() {
    
                    public void run() {
                        try {
                            Thread.sleep(1000);
                            lock.lock();
                            Thread.sleep(100 + RandomUtils.nextInt(100));
    
                            System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
                        } catch (InterruptedException e) {
                            want.fail();
                        } catch (KeeperException e) {
                            want.fail();
                        } finally {
                            latch.countDown();
                            try {
                                lock.unlock();
                            } catch (KeeperException e) {
                                want.fail();
                            }
                        }
    
                    }
                });
            }
    
            try {
                latch.await();
            } catch (InterruptedException e) {
                want.fail();
            }
    
            exeucotr.shutdown();
        }

    最后

    其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下

    大致思路:

    1. 竞争资源标示:  read_自增id , write_自增id
    2. 首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
    3. watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)
    • 大小: 74.4 KB
        
    分享到:                  
    • 2011-09-30 17:00
    •       
    • 浏览 6709
    •       
    • 评论(3)
    •                   
    • 分类:企业架构
    •            
    • 相关推荐
    •     
    评论
        
    3 楼     agapple    2013-11-18            
    accp_huangxin 写道
    大哥,能提供一下LockNode这个对象结构吗
    Java代码 复制代码 收藏代码
    1. public class LockNode implements Comparable<LockNode> {  
    2.   
    3.     private final String name;  
    4.     private String       prefix;  
    5.     private int          sequence = -1;  
    6.   
    7.     public LockNode(String name){  
    8.         Assert.notNull(name, "id cannot be null");  
    9.         this.name = name;  
    10.         this.prefix = name;  
    11.         int idx = name.lastIndexOf('-');  
    12.         if (idx >= 0) {  
    13.             this.prefix = name.substring(0, idx);  
    14.             try {  
    15.                 this.sequence = Integer.parseInt(name.substring(idx + 1));  
    16.             } catch (Exception e) {  
    17.                 // ignore  
    18.             }  
    19.         }  
    20.     }  
    21.   
    22.     public int compareTo(LockNode that) {  
    23.         int s1 = this.sequence;  
    24.         int s2 = that.sequence;  
    25.         if (s1 == -1 && s2 == -1) {  
    26.             return this.name.compareTo(that.name);  
    27.         }  
    28.   
    29.         if (s1 == -1) {  
    30.             return -1;  
    31.         } else if (s2 == -1) {  
    32.             return 1;  
    33.         } else {  
    34.             return s1 - s2;  
    35.         }  
    36.     }  
    37.   
    38.     public String getName() {  
    39.         return name;  
    40.     }  
    41.   
    42.     public int getSequence() {  
    43.         return sequence;  
    44.     }  
    45.   
    46.     public String getPrefix() {  
    47.         return prefix;  
    48.     }  
    49.   
    50.     public String toString() {  
    51.         return name.toString();  
    52.     }  
    53.   
    54.     // ==================== hashcode & equals方法=======================  
    55.   
    56.     @Override  
    57.     public int hashCode() {  
    58.         final int prime = 31;  
    59.         int result = 1;  
    60.         result = prime * result + ((name == null) ? 0 : name.hashCode());  
    61.         return result;  
    62.     }  
    63.   
    64.     @Override  
    65.     public boolean equals(Object obj) {  
    66.         if (this == obj) {  
    67.             return true;  
    68.         }  
    69.         if (obj == null) {  
    70.             return false;  
    71.         }  
    72.         if (getClass() != obj.getClass()) {  
    73.             return false;  
    74.         }  
    75.         LockNode other = (LockNode) obj;  
    76.         if (name == null) {  
    77.             if (other.name != null) {  
    78.                 return false;  
    79.             }  
    80.         } else if (!name.equals(other.name)) {  
    81.             return false;  
    82.         }  
    83.         return true;  
    84.     }  
    85.   
    86. }  
    public class LockNode implements Comparable<LockNode> {
    
        private final String name;
        private String       prefix;
        private int          sequence = -1;
    
        public LockNode(String name){
            Assert.notNull(name, "id cannot be null");
            this.name = name;
            this.prefix = name;
            int idx = name.lastIndexOf('-');
            if (idx >= 0) {
                this.prefix = name.substring(0, idx);
                try {
                    this.sequence = Integer.parseInt(name.substring(idx + 1));
                } catch (Exception e) {
                    // ignore
                }
            }
        }
    
        public int compareTo(LockNode that) {
            int s1 = this.sequence;
            int s2 = that.sequence;
            if (s1 == -1 && s2 == -1) {
                return this.name.compareTo(that.name);
            }
    
            if (s1 == -1) {
                return -1;
            } else if (s2 == -1) {
                return 1;
            } else {
                return s1 - s2;
            }
        }
    
        public String getName() {
            return name;
        }
    
        public int getSequence() {
            return sequence;
        }
    
        public String getPrefix() {
            return prefix;
        }
    
        public String toString() {
            return name.toString();
        }
    
        // ==================== hashcode & equals方法=======================
    
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((name == null) ? 0 : name.hashCode());
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            LockNode other = (LockNode) obj;
            if (name == null) {
                if (other.name != null) {
                    return false;
                }
            } else if (!name.equals(other.name)) {
                return false;
            }
            return true;
        }
    
    }
    
       

     

            
            
     

     
         
                

  • 相关阅读:
    Linux—服务管理三种方式(chkconfig和service和systemctl)
    Linux bash篇(二 操作环境)
    Linux bash篇,基本信息和变量
    Linux 磁盘管理篇,设备文件
    Linux 磁盘管理篇, 内存交换空间
    Linux 磁盘管理篇,目录管理(一)
    Linux 磁盘管理篇, 目录管理(二)
    Linux 磁盘管理篇,连接文件
    Linux 磁盘管理篇(一 磁盘分区)
    Linux 磁盘管理篇,开机挂载
  • 原文地址:https://www.cnblogs.com/hansongjiang/p/3918152.html
Copyright © 2011-2022 走看看