zoukankan      html  css  js  c++  java
  • 服务注册中心之ZooKeeper系列(三) 实现分布式锁

      通过ZooKeeper的有序节点、节点路径不回重复、还有节点删除会触发Wathcer事件的这些特性,我们可以实现分布式锁。

    一、思路

    1. zookeeper中创建一个根节点Locks,用于后续各个客户端的锁操作。
    2. 当要获取锁的时候,在Locks节点下创建“Lock_序号”的零时有序节点(临时节点为了客户端突发断开连接,则此节点消失)。
    3. 如果没有得到锁,就监控排在自己前面的序号节点,等待它的释放。
    4. 当前面的锁被释放后,触发Process方法,然后继续获取当前子节点,判断当前节点是不是第一个,是 返回锁,否 获取锁失败。

    二、实现

      在实现是要了解一个类 AutoResetEvent。AutoResetEvent 常常被用来在两个线程之间进行信号发送。它有两个重要的方法:

      Set() :发送信号到等待线程以继续其工作。

      bool WaitOne():等待另一个线程发信号,只有收到信号,线程才继续往下执行 ,会一直等待下去,返回值表示是否收到信号。

      bool WaitOne(int millisecondsTimeout):等待指定时间,如果没有收到信号继续执行,返回值表示是否收到信号。

    下面为具体实现方法:

    public class ZooKeeperLock
        {
            private MyWatcher myWatcher;
    
            private string lockNode;
    
            private org.apache.zookeeper.ZooKeeper zooKeeper;
    
            public ZooKeeperLock()
            {
                myWatcher = new MyWatcher();
            }
    
            /// <summary>
            /// 获取锁
            /// </summary>
            /// <param name="millisecondsTimeout">等待时间</param>
            /// <returns></returns>
            public async Task<bool> TryLock(int millisecondsTimeout = 0)
            {
                try
                {
                    zooKeeper = new org.apache.zookeeper.ZooKeeper("127.0.0.1", 50000, new MyWatcher());
    
                    //创建锁节点
                    if (await zooKeeper.existsAsync("/Locks") == null)
                        await zooKeeper.createAsync("/Locks", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
                    //新建一个临时锁节点
                    lockNode = await zooKeeper.createAsync("/Locks/Lock_", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
                    //获取锁下所有节点
                    var lockNodes = await zooKeeper.getChildrenAsync("/Locks");
    
                    lockNodes.Children.Sort();
    
                    //判断如果创建的节点就是最小节点 返回锁
                    if (lockNode.Split("/").Last() == lockNodes.Children[0])
                        return true;
                    else
                    {
                        //当前节点的位置
                        var location = lockNodes.Children.FindIndex(n => n == lockNode.Split("/").Last());
                        //获取当前节点 前面一个节点的路径
                        var frontNodePath = lockNodes.Children[location - 1];
                        //在前面一个节点上加上Watcher ,当前面那个节点删除时,会触发Process方法
                        await zooKeeper.getDataAsync("/Locks/" + frontNodePath, myWatcher);
    
                        //如果时间为0 一直等待下去
                        if (millisecondsTimeout == 0)
                            myWatcher.AutoResetEvent.WaitOne();
                        else //如果时间不为0 等待指定时间后,返回结果
                        {
                            var result = myWatcher.AutoResetEvent.WaitOne(millisecondsTimeout);
    
                            if (result)//如果返回True,说明在指定时间内,前面的节点释放了锁(但是可能是中间节点主机宕机 导致,所以不一定触发了Process方法就是得到了锁。需要重新判断是不是第一个节点)
                            {
                                //获取锁下所有节点
                                lockNodes = await zooKeeper.getChildrenAsync("/Locks");
                                //判断如果创建的节点就是最小节点 返回锁
                                if (lockNode.Split("/").Last() == lockNodes.Children[0])
                                    return true;
                                else
                                    return false;
                            }
                            else
                                return false;
    
                        }
                    }
                }
                catch (KeeperException e)
                {
                    await UnLock();
                    throw e;
                }
                return false;
            }
    
            /// <summary>
            /// 释放锁
            /// </summary>
            /// <returns></returns>
            public async Task UnLock()
            {
                try
                {
                    myWatcher.AutoResetEvent.Dispose();await zooKeeper.deleteAsync(lockNode);
                }
                catch (KeeperException e)
                {
                    throw e;
                }
            }
    
        }

     Process方法实现:

     public class MyWatcher : Watcher
        {
            public AutoResetEvent AutoResetEvent;
    
            public MyWatcher()
            {
                this.AutoResetEvent = new AutoResetEvent(false);
            }
    
            public override Task process(WatchedEvent @event)
            {
                if (@event.get_Type() == EventType.NodeDeleted)
                {
                    AutoResetEvent.Set();
                }
                return null;
            }
        }

    本文源代码在:分布式实现代码

    如果你认为文章写的不错,就点个【推荐】吧

  • 相关阅读:
    分布式缓存系统Memcached
    HTTP(GET/POST)请求过程中的编码问题
    将指定的Json字符串转为指定的T类型对象(转帖)
    Linux 中有几个文件压缩和解压缩工
    策略添加-通过域策略组自动映射共享文件夹
    Centos 7 加AD域
    Gns3 模拟器创建VLAN
    防火墙常用命令
    Centos 6 任务计划配置
    Cenots 7 开启 SSH_远程连接
  • 原文地址:https://www.cnblogs.com/MicroHeart/p/10453855.html
Copyright © 2011-2022 走看看