zoukankan      html  css  js  c++  java
  • ZooKeeper Distributed lock

    Test

    Enumerable.Range(1, 5).ToList().ForEach(i =>
                 {
                     Task.Run(() =>
                     {
    
                         var lockHelper = new ZooKeeperLockHelper("localhost:5181");
                         lockHelper.OnAcquireLock += (id) =>
                         {
                             var random = new Random().Next(10);
                             Log.Debug("NodeId {@id} executing.....Sleep {@ms} ms", id, random * 1000);
                             
                             Thread.Sleep(random * 1000);
                             Log.Debug("NodeId {@id} executing success", id);
    
                             return Task.CompletedTask;
                         };
    
                         lockHelper.AcquireLock();
                        
                    });
    
                 });
    
    using org.apache.zookeeper;
    using Serilog;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Text.RegularExpressions;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RedisDemo
    {
        public class ZooKeeperLockHelper : Watcher, IDisposable
        {
    
            #region event
    
            public event Func<long, Task> OnAcquireLock;
            #endregion
    
            private bool _disposed;
    
            private ZooKeeper _zooKeeper;
            private Event.KeeperState _currentState;
            private AutoResetEvent _notifyEvent = new AutoResetEvent(false);
    
    
            private string _connectionString;
    
            private bool _hasAcquireLock;
            private string _lockPath;
            private long _currentNodeId;
    
            private static readonly string DEFAULT_PATH = "/zk";
            private static readonly string NODE_NAME = "node-";
    
            public ZooKeeperLockHelper(string connectionString)
            {
                _connectionString = connectionString;
                this.Initialize(_connectionString, TimeSpan.FromSeconds(60));
            }
    
            public void AcquireLock(string path = "")
            {
                if (this._hasAcquireLock)
                {
                    FireAcquireLock(this._currentNodeId).Wait();
                    return;
                }
    
                if (!WaitConnected(TimeSpan.FromSeconds(10)))
                {
                    throw new Exception($"{_connectionString} Cannot Connect ZooKeeper");
                }
    
                _lockPath = path;
                if (string.IsNullOrEmpty(_lockPath))
                {
                    _lockPath = DEFAULT_PATH;
                }
    
                var nodePath = _lockPath + "/" + NODE_NAME;
    
                var spath = this._zooKeeper.createAsync(
                    nodePath, Encoding.UTF8.GetBytes("data"),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL).Result;
                this._currentNodeId = ParseNodeId(spath);
    
                var reuslt = this._zooKeeper.getChildrenAsync(_lockPath, true).GetAwaiter().GetResult();
                Log.Debug("#-> Begin Acquire Lock CurrentId {@id}", _currentNodeId);
    
                if (this.IsMinNodeId(reuslt, this._currentNodeId))
                {
                    lock (this)
                    {
                        if (!this._hasAcquireLock)
                        {
                            Log.Debug("NodeId {@id} Direct Acquire Lock", _currentNodeId);
                            this._hasAcquireLock = true;
                            this.FireAcquireLock(this._currentNodeId).Wait();
                        }
    
                    }
                }
    
            }
    
            protected bool IsMinNodeId(ChildrenResult childrenResult, long nodeId)
            {
                if (nodeId == 0 || childrenResult == null || childrenResult.Children.Count == 0)
                    return false;
    
                var nodeIds = new List<long>();
    
                foreach (var item in childrenResult.Children)
                {
                    nodeIds.Add(ParseNodeId(item));
                }
    
                if (nodeIds.Count > 0 && nodeIds.Min() == nodeId)
                {
                    return true;
    
                }
                return false;
            }
    
            protected long ParseNodeId(string path)
            {
                var m = Regex.Match(path, "(\d+)");
                if (m.Success)
                {
                    return long.Parse(m.Groups[0].Value);
                }
                return 0L;
            }
    
            protected void Initialize(String connectionString, TimeSpan sessionTimeout)
            {
                this._zooKeeper = new ZooKeeper(connectionString, (int)sessionTimeout.TotalMilliseconds, this);
            }
    
            public Task FireAcquireLock(long id)
            {
                this.OnAcquireLock(id).Wait();
                this.CloseConnection();
                Log.Debug("NodeId {@id} Close ZooKeeper Success", id);
                return Task.CompletedTask;
            }
    
            public bool WaitConnected(TimeSpan timeout)
            {
                var continueWait = false;
                while (this._currentState != Event.KeeperState.SyncConnected)
                {
                    continueWait = _notifyEvent.WaitOne(timeout);
                    if (!continueWait)
                    {
                        return false;
                    }
                }
                return true;
            }
    
            protected void CloseConnection()
            {
                if (_disposed)
                {
                    return;
                }
                _disposed = true;
    
                if (_zooKeeper != null)
                {
                    try
                    {
                        this._zooKeeper.closeAsync().ConfigureAwait(false).GetAwaiter().GetResult();
                    }
                    catch { }
    
                }
            }
    
            #region Watcher Impl
    
            public override Task process(WatchedEvent @event)
            {
                if (@event.getState() == Event.KeeperState.SyncConnected)
                {
                    if (String.IsNullOrEmpty(@event.getPath()))
                    {
                        this._currentState = @event.getState();
                        this._notifyEvent.Set();
                    }
    
                    var path = @event.getPath();
                    if (!string.IsNullOrEmpty(path))
                    {
                        if (path.Equals(this._lockPath))
                        {
                            Log.Debug("NodeId {@id} Start Watcher Callback", this._currentNodeId);
    
                            if (this._hasAcquireLock)
                            {
                                Log.Debug("NodeId {@id} Has Acquire Lock return", this._currentNodeId);
                                return Task.CompletedTask;
                            }
    
                            Task.Run(() =>
                            {
                                var childrenResult = _zooKeeper.getChildrenAsync(this._lockPath, this).Result;
    
                                if (IsMinNodeId(childrenResult, this._currentNodeId))
                                {
                                    lock (this)
                                    {
                                        if (!this._hasAcquireLock)
                                        {
                                            Log.Debug("NodeId {@id} Acquire Lock", this._currentNodeId);
                                            this._hasAcquireLock = true;
                                            this.FireAcquireLock(this._currentNodeId).Wait();
                                        }
                                    }
                                }
                            });
    
                            //_zooKeeper.getChildrenAsync(_lockPath, this);
    
                        }
                    }
                }
    
                return Task.CompletedTask;
            }
    
    
    
            public void Dispose()
            {
                this.CloseConnection();
            }
    
            #endregion
        }
    }
    
    
  • 相关阅读:
    JDOJ 2197: 校门外的树
    简单线段树知识点详解
    求GCD(最大公约数)的两种方式
    USACO Buying Feed, II
    USACO Dueling GPS's
    USACO Milking Cows
    NOIP 2014 比例简化
    USACO Clumsy Cows
    JDOJ 1140: 完数
    NOIP 2008 火柴棒等式
  • 原文地址:https://www.cnblogs.com/byxxw/p/11352564.html
Copyright © 2011-2022 走看看