zoukankan      html  css  js  c++  java
  • 转载 [ZooKeeper.net] 3 ZooKeeper的分布式锁

    [ZooKeeper.net] 3 ZooKeeper的分布式锁

     
    基于ZooKeeper的分布式锁 

    ZooKeeper 里实现分布式锁的基本逻辑:

       1.zookeeper中创建一个根节点(Locks),用于后续各个客户端的锁操作。

       2.想要获取锁的client都在Locks中创建一个自增序的子节点,每个client得到一个序号,如果自己的序号是最小的则获得锁。

       3.如果没有得到锁,就监控排在自己前面的序号节点,并且设置默认时间,等待它的释放。

       4.业务操作后释放锁,然后监控自己的节点的client就被唤醒得到锁。(例如client A需要释放锁,只需要把对应的节点1删除掉,因为client B已经关注了节点1,那么当节点1被删除后,zookeeper就会通知client B:你是序号最小的了,可以获取锁了)

    释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。

    解决方案目录:

          Demo1 Demo2为测试场景  

          ZooKeepr_Lock为锁操作代码

         

    下面贴一下代码看看

    复制代码
      1 public class ZooKeeprDistributedLock : IWatcher
      2     {
      3         /// <summary>
      4         /// zk链接字符串
      5         /// </summary>
      6         private String connectString = "127.0.0.1:2181";
      7         private ZooKeeper zk;
      8         private string root = "/locks"; //根        
      9         private string lockName; //竞争资源的标志        
     10         private string waitNode; //等待前一个锁        
     11         private string myZnode; //当前锁               
     12         private AutoResetEvent autoevent;
     13         private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds(50000);
     14         private IList<Exception> exception = new List<Exception>();
     15 
     16         /// <summary>
     17         /// 创建分布式锁
     18         /// </summary>
     19         /// <param name="lockName">竞争资源标志,lockName中不能包含单词lock</param>
     20         public ZooKeeprDistributedLock(string lockName)
     21         {
     22             this.lockName = lockName;
     23             // 创建一个与服务器的连接            
     24             try
     25             {
     26                 zk = new ZooKeeper(connectString, sessionTimeout, this);
     27                 Stopwatch sw = new Stopwatch();
     28                 sw.Start();
     29                 while (true)
     30                 {
     31                     if (zk.State == States.CONNECTING) { break; }
     32                     if (zk.State == States.CONNECTED) { break; }
     33                 }
     34                 sw.Stop();
     35                 TimeSpan ts2 = sw.Elapsed;
     36                 Console.WriteLine("zoo连接总共花费{0}ms.", ts2.TotalMilliseconds);
     37 
     38                 var stat = zk.Exists(root, false);
     39                 if (stat == null)
     40                 {
     41                     // 创建根节点                    
     42                     zk.Create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
     43                 }
     44             }
     45             catch (KeeperException e)
     46             {
     47                 throw e;
     48             }
     49         }
     50 
     51         /// <summary>        
     52         /// zookeeper节点的监视器        
     53         /// </summary>        
     54         public virtual void Process(WatchedEvent @event)
     55 
     56         {
     57             if (this.autoevent != null)
     58             {
     59                 //将事件状态设置为终止状态,允许一个或多个等待线程继续;如果该操作成功,则返回true;否则,返回false
     60                 this.autoevent.Set();
     61             }
     62         }
     63 
     64         public virtual bool tryLock()
     65         {
     66             try
     67             {
     68                 string splitStr = "_lock_";
     69                 if (lockName.Contains(splitStr))
     70                 {
     71                     //throw new LockException("lockName can not contains \u000B");                
     72                 }
     73                 //创建临时子节点                
     74                 myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
     75                 Console.WriteLine(myZnode + "    创建完成! ");
     76                 //取出所有子节点                
     77                 IList<string> subNodes = zk.GetChildren(root, false).ToList<string>();
     78                 //取出所有lockName的锁                
     79                 IList<string> lockObjNodes = new List<string>();
     80                 foreach (string node in subNodes)
     81                 {
     82                     if (node.StartsWith(lockName))
     83                     {
     84                         lockObjNodes.Add(node);
     85                     }
     86                 }
     87                 Array alockObjNodes = lockObjNodes.ToArray();
     88                 Array.Sort(alockObjNodes);
     89                 Console.WriteLine(myZnode + "==" + lockObjNodes[0]);
     90                 if (myZnode.Equals(root + "/" + lockObjNodes[0]))
     91                 {
     92                     //如果是最小的节点,则表示取得锁   
     93                     Console.WriteLine(myZnode + "    获取锁成功! ");
     94                     return true;
     95                 }
     96                 //如果不是最小的节点,找到比自己小1的节点               
     97                 string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + 1);
     98                 waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - 1];
     99             }
    100             catch (KeeperException e)
    101             {
    102                 throw e;
    103             }
    104             return false;
    105         }
    106 
    107 
    108         public virtual bool tryLock(TimeSpan time)
    109         {
    110             try
    111             {
    112                 if (this.tryLock())
    113                 {
    114                     return true;
    115                 }
    116                 return waitForLock(waitNode, time);
    117             }
    118             catch (KeeperException e)
    119             {
    120                 throw e;
    121             }
    122         }
    123 
    124         /// <summary>
    125         /// 等待锁
    126         /// </summary>
    127         /// <param name="lower">需等待的锁节点</param>
    128         /// <param name="waitTime">等待时间</param>
    129         /// <returns></returns>
    130         private bool waitForLock(string lower, TimeSpan waitTime)
    131         {
    132             var stat = zk.Exists(root + "/" + lower, true);
    133             //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听            
    134             if (stat != null)
    135             {
    136                 Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + " waiting for " + root + "/" + lower);
    137                 autoevent = new AutoResetEvent(false);
    138                 //阻止当前线程,直到当前实例收到信号,使用 TimeSpan 度量时间间隔并指定是否在等待之前退出同步域
    139                 bool r = autoevent.WaitOne(waitTime);
    140                 autoevent.Dispose();
    141                 autoevent = null;
    142                 return r;
    143             }
    144             else return true;
    145         }
    146 
    147         /// <summary>
    148         /// 解除锁
    149         /// </summary>
    150         public virtual void unlock()
    151         {
    152             try
    153             {
    154                 Console.WriteLine("unlock " + myZnode);
    155                 zk.Delete(myZnode, -1);
    156                 myZnode = null;
    157                 zk.Dispose();
    158             }
    159             catch (KeeperException e)
    160             {
    161                 throw e;
    162             }
    163         }
    164     }
    复制代码

    然后先看demo2 :  当前获取到锁以后 释放锁的操作被阻塞 然后运行demo1 进行测试

    复制代码
        int count = 1;//库存 商品编号1079233
        if (count == 1)
        {
            ZooKeeprDistributedLock zklock = new ZooKeeprDistributedLock("Getorder_Pid1079233");
            //创建锁
            if (zklock.tryLock(TimeSpan.FromMilliseconds(50000)))
            {
                Console.WriteLine("Demo2创建订单成功!");
            }
            else { Console.WriteLine("Demo2创建订单失败了!"); }
            Thread.Sleep(30000);//对操作释放锁进行阻塞
            Console.WriteLine(DateTime.Now.ToString("yyyyMMdd HH:mm:ss")); //要进行释放锁的操作时间 主要测试当前锁释放后 Demo1的节点监控是否唤起
            zklock.unlock();//释放锁
            Console.ReadKey();
        }
    复制代码

    demo1:demo1会对排在前面的节点进行监控 当demo2释放锁后 demo1获取锁  demo1创建订单与释放锁之间打印了操作时间 

    可以跟demo2进行释放锁的时间进行对比下

    复制代码
        int count = 1;//库存 商品编号1079233
        if (count == 1)
        {
            ZooKeeprDistributedLock zklock = new ZooKeeprDistributedLock("Getorder_Pid1079233");
            if (zklock.tryLock(TimeSpan.FromMilliseconds(50000)))
            { 
                Console.WriteLine("Demo1创建订单成功!");
            }
            else
            {
                Console.WriteLine("Demo1创建订单失败了!");
            }
            Console.WriteLine(DateTime.Now.ToString("yyyyMMdd HH:mm:ss"));
            zklock.unlock();
            Console.ReadKey();
        }
    复制代码

    这里是我运行后的结果,只精确到秒,可以看到demo2释放锁后,demo1的AutoResetEvent立即被阻断了然后demo1也就获得了锁!

    场景二:将demo1的监听注释后,demo2未释放锁,demo1创建订单失败

      //if (zklock.tryLock(TimeSpan.FromMilliseconds(50000)))
                    if (zklock.tryLock())

    有关此篇一些图片及内容借鉴了几位园友的博文,在此感谢!

    http://www.cnblogs.com/Evil-Rebe/p/6057067.html

    http://www.cnblogs.com/chejiangyi/p/4938400.html

  • 相关阅读:
    HBase-MapReduce
    HBase API 操 作
    HBase-Shell-数据结构-原理
    HBase-简介-安装配置
    Kafka 与 Flume
    kafka-Streams
    Kafka-producer拦截器(interceptor)
    Kafka-API
    Kafka-工作流程分析
    day06 Java面向对象
  • 原文地址:https://www.cnblogs.com/Jeely/p/10716240.html
Copyright © 2011-2022 走看看