zoukankan      html  css  js  c++  java
  • C# 读写锁

    读写锁

        /// <summary>
        /// 提供异步独占和并发执行支持
        /// </summary>
        public sealed class AsyncReaderWriter
        {
            /// <summary>
            /// 在当前实例中保护所有共享状态的锁
            /// </summary>
            private readonly object _lock = new object();
    
            /// <summary>
            /// 并发读任务等待执行的队列
            /// </summary>
            private readonly Queue<Task> _waitingCocurrent = new Queue<Task>();
    
            /// <summary>
            /// 独占写任务等待执行的队列
            /// </summary>
            private readonly Queue<Task> _waitingExclusive = new Queue<Task>();
    
            /// <summary>
            /// 并发读任务正在执行的数量
            /// </summary>
            private int _currentConcurrent = 0;
    
            /// <summary>
            /// 独占写任务是否正在执行
            /// </summary>
            private bool _currentlyExclusive = false;
    
            /// <summary>
            /// 非泛型的任务创建工厂
            /// </summary>
            private TaskFactory _factory;
    
            /// <summary>
            /// 初始化
            /// </summary>
            public AsyncReaderWriter()
            {
                this._factory = Task.Factory;
            }
    
            /// <summary>
            /// 使用指定的<see cref="TaskFactory"/>初始化<see cref="AsyncReaderWriter"/>, 为我们创建所有任务
            /// </summary>
            /// <param name="factory">用来创建所有任务的<see cref="TaskFactory"/></param>
            public AsyncReaderWriter(TaskFactory factory)
            {
                if (factory == null)
                    throw new ArgumentNullException("factory");
                this._factory = factory;
            }
    
            /// <summary>
            /// 获取当前队列中正在等待的独占写任务数量
            /// </summary>
            public int WaitingExclusive { get { lock (_lock) return this._waitingExclusive.Count; } }
    
            /// <summary>
            /// 获取当前队列中正在等待的并发读任务数量
            /// </summary>
            public int WaitingConcurrent { get { lock (_lock) return this._waitingCocurrent.Count; } }
    
            /// <summary>
            /// 获取并发读任务正在执行的数量
            /// </summary>
            public int CurrentConcurrent { get { lock (_lock) return this._currentConcurrent; } }
    
            /// <summary>
            /// 获取当前独占写任务是否正在执行
            /// </summary>
            public bool CurrentlyExclusive { get { lock (_lock) return this._currentlyExclusive; } }
    
            /// <summary>
            /// 将独占写<see cref="Action"/>入队到<see cref="AsyncReaderWriter"/>
            /// </summary>
            /// <param name="action">将要被以独占写方式执行的<see cref="Action"/></param>
            /// <returns>表示执行提供的<see cref="Action"/>的任务</returns>
            public Task QueueExclusiveWriter(Action action)
            {
                var task = new Task(state =>
                {
                    try
                    {
                        //运行提供的action
                        ((Action)state)();
                    }
                    finally
                    {
                        //确保我们完成后清理
                        FinishExclusiveWriter();
                    }
                }, action, this._factory.CancellationToken, this._factory.CreationOptions);
    
                lock (_lock)
                {
                    //如果当前有任务正在运行,或者其他的独占写任务需要运行, 入队
                    //否则,没有其他正在运行或将要运行的任务,现在就执行当前任务
                    if (this._currentlyExclusive || this._currentConcurrent > 0 || this._waitingExclusive.Count > 0)
                    {
                        this._waitingExclusive.Enqueue(task);
                    }
                    else
                    {
                        RunExclusive_RequiresLock(task);
                    }
                }
                return task;
            }
    
            /// <summary>
            /// 将独占写<see cref="Func{TResult}"/>入队到<see cref="AsyncReaderWriter"/>
            /// </summary>
            /// <typeparam name="TResult"><see cref="Func{TResult}"/>委托封装的方法的返回值类型。</typeparam>
            /// <param name="fun">将要被以独占写方式指定的<see cref="Func{TResult}"/></param>
            /// <returns>表示执行提供的<see cref="Func{TResult}"/>的任务</returns>
            public Task<TResult> QueueExclusiveWriter<TResult>(Func<TResult> fun)
            {
                var task = new Task<TResult>(state =>
                {
                    try
                    {
                        return ((Func<TResult>)state)();
                    }
                    finally
                    {
                        FinishExclusiveWriter();
                    }
                }, fun, this._factory.CancellationToken, this._factory.CreationOptions);
    
                lock (_lock)
                {
                    //如果当前有任务正在运行,或者其他的独占写任务需要运行, 入队
                    //否则,没有其他正在运行或将要运行的任务,现在就执行当前任务
                    if (this._currentlyExclusive || this._currentConcurrent > 0 || this._waitingExclusive.Count > 0)
                    {
                        this._waitingExclusive.Enqueue(task);
                    }
                    else
                    {
                        RunExclusive_RequiresLock(task);
                    }
                }
                return task;
            }
    
            /// <summary>
            /// 将并发读<see cref="Action"/>入队到<see cref="AsyncReaderWriter"/>
            /// </summary>
            /// <param name="action">将要被以并发读方式执行的<see cref="Action"/></param>
            /// <returns>表示执行提供的<see cref="Action"/>的任务</returns>
            public Task QueueConcurrentReader(Action action)
            {
                var task = new Task(state =>
                {
                    try
                    {
                        ((Action)state)();
                    }
                    finally
                    {
                        FinishConcurrentReader();
                    }
                }, action, this._factory.CancellationToken, this._factory.CreationOptions);
    
                lock (_lock)
                {
                    //如果现在有独占写任务正在运行或者等待
                    //将当前任务入队
                    if (this._currentlyExclusive || this._waitingExclusive.Count > 0)
                    {
                        this._waitingCocurrent.Enqueue(task);
                    }
                    else
                    {
                        //否则立即调度
                        RunConcurrent_RequiresLock(task);
                    }
                }
                return task;
            }
    
            /// <summary>
            /// 将并发读<see cref="Func{TResult}"/>入队到<see cref="AsyncReaderWriter"/>
            /// </summary>
            /// <typeparam name="TResult"><see cref="Func{TResult}"/>委托封装的方法返回值类型</typeparam>
            /// <param name="fun">将要被以并发读方式执行的<see cref="Func{TResult}"/></param>
            /// <returns>表示执行提供的<see cref="Func{TResult}"/>的任务</returns>
            public Task<TResult> QueueConcurrentReader<TResult>(Func<TResult> fun)
            {
                var task = new Task<TResult>(state =>
                {
                    try
                    {
                        return ((Func<TResult>)state)();
                    }
                    finally
                    {
                        FinishConcurrentReader();
                    }
                }, fun, this._factory.CancellationToken, this._factory.CreationOptions);
    
                lock (_lock)
                {
                    if (_currentlyExclusive || this._waitingExclusive.Count > 0)
                    {
                        this._waitingCocurrent.Enqueue(task);
                    }
                    else
                    {
                        RunConcurrent_RequiresLock(task);
                    }
                }
                return task;
            }
    
            #region 私有方法
    
            /// <summary>
            /// 开始指定的独占任务
            /// </summary>
            /// <param name="exclusive">即将开始的独占任务</param>
            private void RunExclusive_RequiresLock(Task exclusive)
            {
                this._currentlyExclusive = true;
                exclusive.Start(this._factory.Scheduler ?? TaskScheduler.Current);
            }
    
            /// <summary>
            /// 开始指定的并发任务
            /// </summary>
            /// <param name="concurrent">即将开始的并发任务</param>
            private void RunConcurrent_RequiresLock(Task concurrent)
            {
                this._currentConcurrent++;
                concurrent.Start(this._factory.Scheduler ?? TaskScheduler.Current);
            }
    
            /// <summary>
            /// 开始并发队列中的所有任务
            /// </summary>
            private void RunConcurrent_RequiresLock()
            {
                while (this._waitingCocurrent.Count > 0)
                {
                    RunConcurrent_RequiresLock(this._waitingCocurrent.Dequeue());
                }
            }
    
            /// <summary>
            /// 完成并发读任务
            /// </summary>
            private void FinishConcurrentReader()
            {
                lock (_lock)
                {
                    //运行到此处,表示一个并发任务已结束
                    this._currentConcurrent--;
    
                    //如果现在正在运行的并发任务数为0, 并且还有正在等待的独占任务, 执行一个
                    if (this._currentConcurrent == 0 && this._waitingExclusive.Count > 0)
                    {
                        RunExclusive_RequiresLock(this._waitingExclusive.Dequeue());
                    }
                    //否则, 如果现在没有等待的独占任务,而有一些因为某些原因等待的并发任务(它们本应该在添加到队列的时候就开始了), 运行所有正在等待的并发任务
                    else if (this._waitingExclusive.Count == 0 && this._waitingCocurrent.Count > 0)
                    {
                        RunConcurrent_RequiresLock();
                    }
                }
            }
    
            /// <summary>
            /// 完成独占写任务
            /// </summary>
            private void FinishExclusiveWriter()
            {
                lock (_lock)
                {
                    //运行到此处,表示一个独占任务已结束
                    this._currentlyExclusive = false;
    
                    //如果当前仍有正在等待的独占任务, 以内联方式运行下一个
                    if (this._waitingExclusive.Count > 0)
                    {
                        RunExclusive_RequiresLock(this._waitingExclusive.Dequeue());
                    }
                    //否则, 如果当前仍有正在等待的并发任务, 运行所有
                    else if (this._waitingCocurrent.Count > 0)
                    {
                        RunConcurrent_RequiresLock();
                    }
                }
            }
            #endregion
        }
    

    使用方式:

    var read = new Action(() =>
    {
        Debug.WriteLine($"读取:{File.ReadLines(fileName).EmptyToNull()?.Last()}");
    });
    
    var write = new Action(() =>
    {
        var text = $"{DateTime.Now.Ticks.ToString()}\r\n";
        File.AppendAllText(fileName, text);
        Debug.Write($"\t写入:{text}");
    });
    
    var rw = new AsyncReaderWriter();
    for(var  i = 0; i < 10; i++)
    {
        rw.QueueExclusiveWriter(write);
        rw.QueueConcurrentReader(read);
    }
  • 相关阅读:
    创建并发布npm包
    ubuntu下python+tornado+supervisor+nginx部署
    Ubuntu下pycharm设定任务栏图标后打开出现问号图标
    Ubuntu下安装keras
    Windows和Ubuntu双系统
    Java获取精确到秒的时间戳
    Jmeter Java请求
    git 生成公钥、私钥方法与clone使用方法
    Oracle 安全管理
    五、python沉淀之路--字典
  • 原文地址:https://www.cnblogs.com/aning2015/p/7729670.html
Copyright © 2011-2022 走看看