zoukankan      html  css  js  c++  java
  • c#多线程编程实战(原书第二版)文摘

    Thread t = new Thread(PrintNumbersWithDelay);
    t.Start();
    t.Join();

    但我们在主程序中调用了t.Join方法,该方法允许我们等待直到线程t完成。当线程t完成时,主程序会继续运行。借助该技术可以实现在两个线程间同步。

    当主程序和单独的数字打印线程运行时,我们等待6秒后对线程调用了t.Abort方法。这给线程注入了ThreadAbortException方法,导致线程被终结。这非常危险,因为该异常可以在任何时刻发生并可能彻底摧毁应用程序。另外,使用该技术也不一定总能终止线程。目标线程可以通过处理该异常并调用Thread.ResetAbort方法来拒绝被终止。因此并不推荐使用Abort方法来关闭线程。可优先使用一些其他方法,比如提供一个CancellationToken方法来取消线程的执行。

    进程会等待所有的前台线程完成后再结束工作,但是如果只剩下后台线程,则会直接结束工作。

    线程之间传递参数:

        var threadThree = new Thread(() => CountNumbers(12));
                threadThree.Name = "ThreadThree";
                threadThree.Start();
                threadThree.Join();
    
    static void CountNumbers(int iterations)
            {
                for (int i = 1; i <= iterations; i++)
                {
                    Sleep(TimeSpan.FromSeconds(2));
                    WriteLine($"{CurrentThread.Name} prints {i}");
                }
            }

    当主程序启动时,定义了两个将会抛出异常的线程。其中一个对异常进行了处理,另一个则没有。可以看到第二个异常没有被包裹启动线程的try/catch代码块捕获到。所以如果直接使用线程,一般来说不要在线程中抛出异常,而是在线程代码中使用try/catch代码块。

    第一章 线程基础

    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    第二章 线程同步

    为了利用好这两种方式,可以使用混合模式(hybrid)。混合模式先尝试使用用户模式等待,如果线程等待了足够长的时间,则会切换到阻塞状态以节省CPU资源。

    ·执行基本的原子操作

    而借助于Interlocked类,我们无需锁定任何对象即可获取到正确的结果。Interlocked提供了Increment、Decrement和Add等基本数学操作的原子方法,从而帮助我们在编写Counter类时无需使用锁

    class CounterNoLock : CounterBase
            {
                private int _count;
    
                public int Count => _count;
    
                public override void Increment()
                {
                    Interlocked.Increment(ref _count);
                }
    
                public override void Decrement()
                {
                    Interlocked.Decrement(ref _count);
                }
            }

    ·使用Mutex

        static void Main(string[] args)
            {
                const string MutexName = "CSharpThreadingCookbook";
    
                using (var m = new Mutex(false, MutexName))
                {
                    if (!m.WaitOne(TimeSpan.FromSeconds(5), false))
                    {
                        WriteLine("Second instance is running!");
                    }
                    else
                    {
                        WriteLine("Running!");
                        ReadLine();
                        m.ReleaseMutex();
                    }
                }
            }

    当主程序启动时,定义了一个指定名称的互斥量,设置initialOwner标志为false。这意味着如果互斥量已经被创建,则允许程序获取该互斥量。如果没有获取到互斥量,程序则简单地显示Running,等待直到按下了任何键,然后释放该互斥量并退出。
    如果再运行同样一个程序,则会在5秒钟内尝试获取互斥量。如果此时在第一个程序中按下了任何键,第二个程序则会开始执行。然而,如果保持等待5秒钟第二个程序将无法获取到该互斥量

    注意具名的互斥量是全局的操作系统对象!请务必正确关闭互斥量。最好是使用using代码块来包裹互斥量对象。
    该方式可用于在不同的程序中同步线程,可被推广到大量的使用场景中。

    ·使用SemaphoreSlim

        static SemaphoreSlim _semaphore = new SemaphoreSlim(4);
    
            static void AccessDatabase(string name, int seconds)
            {
                WriteLine($"{name} waits to access a database");
                _semaphore.Wait();
                WriteLine($"{name} was granted an access to a database");
                Sleep(TimeSpan.FromSeconds(seconds));
                WriteLine($"{name} is completed");
                _semaphore.Release();
            }

    每个线程都尝试获取数据库的访问,但是我们借助于信号系统限制了访问数据库的并发数为4个线程。当有4个线程获取了数据库的访问后,其他两个线程需要等待,直到之前线程中的某一个完成工作并调用_semaphore.Release方法来发出信号。

    ·使用AutoResetEvent

        private static AutoResetEvent _workerEvent = new AutoResetEvent(false);
            private static AutoResetEvent _mainEvent = new AutoResetEvent(false);

    AutoResetEvent类采用的是内核时间模式,所以等待时间不能太长。使用ManualResetEventslim类更好,因为它使用的是混合模式。

    ·使用ManualResetEventSlim

    在前面某小节中,我们使用了一种无法在操作系统层面工作的混合模式。如果我们需要全局事件,则可以使用EventWaitHandle类,其是AutoResetEvent和ManualResetEvent类的基类
    ·使用CountDownEvent

    本节将描述如何使用CountdownEvent信号类来等待直到一定数量的操作完成。

        static void Main(string[] args)
            {
                WriteLine("Starting two operations");
                var t1 = new Thread(() => PerformOperation("Operation 1 is completed", 4));
                var t2 = new Thread(() => PerformOperation("Operation 2 is completed", 8));
                t1.Start();
                t2.Start();
                _countdown.Wait();
                WriteLine("Both operations have been completed.");
                _countdown.Dispose();
    
                System.Threading.Thread.Sleep(TimeSpan.FromSeconds(12));
            }
    
            static CountdownEvent _countdown = new CountdownEvent(2);
    
            static void PerformOperation(string message, int seconds)
            {
                Sleep(TimeSpan.FromSeconds(seconds));
                WriteLine(message);
                _countdown.Signal();
            }

    当主程序启动时,创建了一个CountdownEvent实例,在其构造函数中指定了当两个操作完成时会发出信号。然后我们启动了两个线程,当它们执行完成后会发出信号。一旦第二个线程完成,主线程会从等待CountdownEvent的状态中返回并继续执行。针对需要等待多个异步操作完成的情形,使用该方式是非常便利的。
    然而这有一个重大的缺点。如果调用_countdown.Signal()没达到指定的次数,那么_countdown.Wait()将一直等待。请确保使用CountdownEvent时,所有线程完成后都要调用Signal方法。

    ·使用Barrier

    Barrier类用于组织多个线程及时在某个时刻碰面。其提供了一个回调函数,每次线程调用了SignalAndWait方法后该回调函数会被执行。

    每个线程将向Barrier发送两次信号,所以会有两个阶段。每次这两个线程调用Signal-AndWait方法时,Barrier将执行回调函数。这在多线程迭代运算中非常有用,可以在每个迭代结束前执行一些计算。当最后一个线程调用SignalAndWait方法时可以在迭代结束时进行交互。

        static void Main(string[] args)
            {
                var t1 = new Thread(() => PlayMusic("the guitarist", "play an amazing solo", 5));
                var t2 = new Thread(() => PlayMusic("the singer", "sing his song", 2));
    
                t1.Start();
                t2.Start();
            }
    
            static Barrier _barrier = new Barrier(2, b => WriteLine($"End of phase {b.CurrentPhaseNumber + 1}"));
    
            static void PlayMusic(string name, string message, int seconds)
            {
                for (int i = 1; i < 3; i++)
                {
                    WriteLine("----------------------------------------------");
                    Sleep(TimeSpan.FromSeconds(seconds));
                    WriteLine($"{name} starts to {message}");
                    Sleep(TimeSpan.FromSeconds(seconds));
                    WriteLine($"{name} finishes to {message}");
                    _barrier.SignalAndWait();
                }
            }

    ·使用ReaderWriterLockSlim

    使用ReaderWriterLockSlim来创建一个线程安全的机制,在多线程中对一个集合进行读写操作。ReaderWriterLockSlim代表了一个管理资源访问的锁,允许多个线程同时读取,以及独占写。

    当主程序启动时,同时运行了三个线程来从字典中读取数据,还有另外两个线程向该字典中写入数据。我们使用ReaderWriterLockSlim类来实现线程安全,该类专为这样的场景而设计。
    这里使用两种锁:读锁允许多线程读取数据,写锁在被释放前会阻塞了其他线程的所有操作。获取读锁时还有一个有意思的场景,即从集合中读取数据时,根据当前数据而决定是否获取一个写锁并修改该集合。一旦得到写锁,会阻止阅读者读

    取数据,从而浪费大量的时间,因此获取写锁后集合会处于阻塞状态。为了最小化阻塞浪费的时间,可以使用EnterUpgradeableReadLock和ExitUpgradeableReadLock方法。先获取读锁后读取数据。如果发现必须修改底层集合,只需使用EnterWriteLock方法升级锁,然后快速执行一次写操作,最后使用ExitWriteLock释放写锁。
    在本例中,我们先生成一个随机数。然后获取读锁并检查该数是否存在于字典的键集合中。如果不存在,将读锁更新为写锁然后将该新键加入到字典中。始终使用try/finally代码块来确保在捕获锁后一定会释放锁,这是一项好的实践。

        static void Main(string[] args)
            {
                new Thread(Read){ IsBackground = true }.Start();
                new Thread(Read){ IsBackground = true }.Start();
                new Thread(Read){ IsBackground = true }.Start();
    
                new Thread(() => Write("Thread 1")){ IsBackground = true }.Start();
                new Thread(() => Write("Thread 2")){ IsBackground = true }.Start();
    
                Sleep(TimeSpan.FromSeconds(30));
            }
    
            static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
            static Dictionary<int, int> _items = new Dictionary<int, int>();
    
            static void Read()
            {
                WriteLine("Reading contents of a dictionary");
                while (true)
                {
                    try
                    {
                        _rw.EnterReadLock();
                        foreach (var key in _items.Keys)
                        {
                            Sleep(TimeSpan.FromSeconds(0.1));
                        }
                    }
                    finally
                    {
                        _rw.ExitReadLock();
                    }
                }
            }
    
            static void Write(string threadName)
            {
                while (true)
                {
                    try
                    {
                        int newKey = new Random().Next(250);
                        _rw.EnterUpgradeableReadLock();
                        if (!_items.ContainsKey(newKey))
                        {
                            try
                            {
                                _rw.EnterWriteLock();
                                _items[newKey] = 1;
                                WriteLine($"New key {newKey} is added to a dictionary by a {threadName}");
                            }
                            finally
                            {
                                _rw.ExitWriteLock();
                            }
                        }
                        Sleep(TimeSpan.FromSeconds(0.1));
                    }
                    finally
                    {
                        _rw.ExitUpgradeableReadLock();
                    }
                }
            }

    ·使用SpinWait类

    本节将描述如何不使用内核模型的方式来使线程等待。另外,我们介绍了SpinWait,它是一个混合同步构造,被设计为使用用户模式等待一段时间,然后切换到内核模式以节省CPU时间。

    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    第三章 使用线程池

    在线程池中调用委托

    向线程池中放入异步操作

        ThreadPool.QueueUserWorkItem( _ =>
                {
                    WriteLine($"Operation state: {x + y}, {lambdaState}");
                    WriteLine($"Worker thread id: {CurrentThread.ManagedThreadId}");
                    Sleep(TimeSpan.FromSeconds(2));
                }, "lambda state");

    线程池与并行度

    实现一个取消按钮

    在线程池中使用等待事件处理器及超时

    使用计时器

    使用backgroundwork组件

    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    第四章  使用任务并行库

    创建任务

    Task.Run(() => TaskMethod("Task 3"));

    使用任务执行基本的操作

    组合任务

        firstTask = new Task<int>(() =>
                {
                    var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5),
                        TaskCreationOptions.AttachedToParent);
    
                    innerTask.ContinueWith(t => TaskMethod("Third Task", 2),
                        TaskContinuationOptions.AttachedToParent);
    
                    return TaskMethod("First Task", 2);
                });

    将APM模式转换成任务

        Task<string> task = Task<string>.Factory.FromAsync(
                    d.BeginInvoke("AsyncTaskThread", Callback, 
                    "a delegate asynchronous call"), d.EndInvoke);

    将EAP模式转化成任务

        var tcs = new TaskCompletionSource<int>();
    
                var worker = new BackgroundWorker();
                worker.DoWork += (sender, eventArgs) =>
                {
                    eventArgs.Result = TaskMethod("Background worker", 5);
                };
    
                worker.RunWorkerCompleted += (sender, eventArgs) =>
                {
                    if (eventArgs.Error != null)
                    {
                        tcs.SetException(eventArgs.Error);
                    }
                    else if (eventArgs.Cancelled)
                    {
                        tcs.SetCanceled();
                    }
                    else
                    {
                        tcs.SetResult((int)eventArgs.Result);
                    }
                };
    
                worker.RunWorkerAsync();
    
                int result = tcs.Task.Result;
    
                WriteLine($"Result is: {result}");

    实现取消选项

       var cts = new CancellationTokenSource();
                var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token), cts.Token);

    处理任务中的异常

      task = Task.Run(() => TaskMethod("Task 2", 2));
                    int result = task.GetAwaiter().GetResult();
                    WriteLine($"Result: {result}");
        var t1 = new Task<int>(() => TaskMethod("Task 3", 3));
                var t2 = new Task<int>(() => TaskMethod("Task 4", 2));
                var complexTask = Task.WhenAll(t1, t2);

    并行运行任务

    var whenAllTask = Task.WhenAll(firstTask, secondTask);
    var completedTask = Task.WhenAny(tasks).Result;

    使用TaskScheduler配置任务的执行

        void ButtonAsyncOK_Click(object sender, RoutedEventArgs e)
            {
                ContentTextBlock.Text = string.Empty;
                Mouse.OverrideCursor = Cursors.Wait;
                Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext());
    
                task.ContinueWith(t => Mouse.OverrideCursor = null,
                    CancellationToken.None,
                    TaskContinuationOptions.None,
                    TaskScheduler.FromCurrentSynchronizationContext());
            }

    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    第五章 使用c#6.0

    使用await操作符获取异步任务结果

    在lambda表达式中使用await操作符

        static async Task AsynchronousProcessing()
            {
                Func<string, Task<string>> asyncLambda = async name => {
                    await Task.Delay(TimeSpan.FromSeconds(2));
                    return
                        $"Task {name} is running on a thread id {CurrentThread.ManagedThreadId}." +
                        $" Is thread pool thread: {CurrentThread.IsThreadPoolThread}";
                };
    
                string result = await asyncLambda("async lambda");
    
                WriteLine(result);
            }

    对连续的异步任务使用await操作符

    对并行执行的异步任务使用await操作符

    static async Task AsynchronousProcessing()
            {
                Task<string> t1 = GetInfoAsync("Task 1", 3);
                Task<string> t2 = GetInfoAsync("Task 2", 5);
    
                string[] results = await Task.WhenAll(t1, t2);
                foreach (string result in results)
                {
                    WriteLine(result);
                }
            }
    
            static async Task<string> GetInfoAsync(string name, int seconds)
            {
                await Task.Delay(TimeSpan.FromSeconds(seconds));//使用同一线程
                //await Task.Run(() => Thread.Sleep(TimeSpan.FromSeconds(seconds)));//使用不同线程
                return
                    $"Task {name} is running on a thread id {CurrentThread.ManagedThreadId}." +
                

    处理异步操作中的异常

    避免使用捕获的同步上下文

    static async Task<TimeSpan> TestNoContext()
            {
                const int iterationsNumber = 100000;
                var sw = new Stopwatch();
                sw.Start();
                for (int i = 0; i < iterationsNumber; i++)
                {
                    var t = Task.Run(() => { });
                    await t.ConfigureAwait(
                        continueOnCapturedContext: false);
                }
                sw.Stop();
                return sw.Elapsed;
            }

    使用 async void方法

    设计一个自定义的awaitable类型

    对动态类型使用await

                IAwaiter<string> proxy = Impromptu.ActLike(awaiter);
    
                result.GetAwaiter = (Func<dynamic>) ( () => proxy );

    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    第六章 使用并发集合

    使用concurrentDictionary

    使用concurrentQueue实现一步处理

    改变concurrentStack异步处理顺序

    使用concurrentBag创建一个可扩展的爬虫

    使用concurrentCollection进行异步处理

    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    第七章 使用PLINQ

    使用Parallel类

    Parallel.Invoke(
                    () => EmulateProcessing("Task1"),
                    () => EmulateProcessing("Task2"),
                    () => EmulateProcessing("Task3")
                );

    并行化Linq查询

        parallelQuery = from t in GetTypes().AsParallel()
                                select EmulateProcessing(t);
    
        parallelQuery.ForAll(PrintInfo);

    调整plinq查询的参数

    try
                {
                    parallelQuery
                        .WithDegreeOfParallelism(Environment.ProcessorCount)
                        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                        .WithMergeOptions(ParallelMergeOptions.Default)
                        .WithCancellation(cts.Token)
                        .ForAll(WriteLine);
                }

    处理plinq查询中的异常

        try
                {
                    parallelQuery.ForAll(WriteLine);
                }
                catch (DivideByZeroException)
                {
                    WriteLine("Divided by zero - usual exception handler!");
                }
                catch (AggregateException e)
                {
                    e.Flatten().Handle(ex =>
                    {
                        if (ex is DivideByZeroException)
                        {
                            WriteLine("Divided by zero - aggregate exception handler!");
                            return true;
                        }
                        
                        return false;
                    });
                }

    管理plinq查询中的数据分区

        public class StringPartitioner : Partitioner<string>
            {
                private readonly IEnumerable<string> _data;
    
                public StringPartitioner(IEnumerable<string> data)
                {
                    _data = data;
                }
    
                public override bool SupportsDynamicPartitions => false;
    
                public override IList<IEnumerator<string>> GetPartitions(int partitionCount)
                {
                    var result = new List<IEnumerator<string>>(partitionCount);
    
                    for (int i = 1; i <= partitionCount; i++)
                    {
                        result.Add(CreateEnumerator(i, partitionCount));
                    }
    
                    return result;
                }
    
                IEnumerator<string> CreateEnumerator(int partitionNumber, int partitionCount)
                {
                    int evenPartitions = partitionCount / 2;
                    bool isEven = partitionNumber % 2 == 0;
                    int step = isEven ? evenPartitions : partitionCount - evenPartitions;
                    int startIndex = partitionNumber / 2 + partitionNumber % 2;
    
                    var q = _data
                            .Where(v => !(v.Length % 2 == 0 ^ isEven) || partitionCount == 1)
                            .Skip(startIndex - 1);
    
                    return q
                            .Where((x, i) => i % step == 0)
                            .GetEnumerator();
    
                }
            }

    为plinq查询创建一个自定义的聚合器

        var parallelAggregator = parallelQuery.Aggregate(
                    () => new ConcurrentDictionary<char, int>(),
                    (taskTotal, item) => AccumulateLettersInformation(taskTotal, item), 
                    (total, taskTotal) => MergeAccumulators(total, taskTotal),
                    total => total);

    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    第八章 RE

    将普通集合转换为异步的可观察集合

            o = EnumerableEventSequence().ToObservable()
                    .SubscribeOn(TaskPoolScheduler.Default);

    编写自定义的可观察对象

    使用Subject

    创建可观察的对象

    对可观察的集合使用linq查询

    使用RX创建异步操作

    第九章 使用异步I/O

    异步使用文件

       using (var stream = File.Create("test3.txt", BUFFER_SIZE, FileOptions.Asynchronous))
                using (var sw = new StreamWriter(stream))
                {
                    WriteLine($"3. Uses I/O Threads: {stream.IsAsync}");
                    await sw.WriteAsync(CreateFileContent());
                }

    编写一个异步的http服务端和客户端

        public async Task Start()
                {
                    _listener.Start();
    
                    while (true)
                    {
                        var ctx = await _listener.GetContextAsync();
                        WriteLine("Client connected...");
                        var response = string.Format(RESPONSE_TEMPLATE, DateTime.Now);
    
                        using (var sw = new StreamWriter(ctx.Response.OutputStream))
                        {
                            await sw.WriteAsync(response);
                            await sw.FlushAsync();
                        }
                    }
                }

    异步操作数据库

    await connection.OpenAsync();
    await detachCommand.ExecuteNonQueryAsync();

    异步调用wcf服务

    第十章 并行编程模式

    实现惰性求值的共享状态

        class BCLThreadSafeFactory : IHasValue
            {
                private ValueToAccess _value;
    
                public ValueToAccess Value => 
                    LazyInitializer.EnsureInitialized(ref _value, Compute);
            }
      class LazyWrapper : IHasValue
            {
                private readonly Lazy<ValueToAccess> _value;
    
                public LazyWrapper(Lazy<ValueToAccess> value )
                {
                    _value = value;
                }
    
                public ValueToAccess Value => _value.Value;
            }

    使用blockingcollection实现并行管道

    使用tpl数据流实现并行管道

    使用plinq实现map/reduce模式

    第十一章 更多信息

  • 相关阅读:
    oracle查询哪些sp修改了某些表
    asp.net mvc
    更新计算机驱动
    instr函数的用法
    UNION ALL UNION
    Python机器学习ch02 代码学习2
    Python机器学习 ch02代码学习1
    转载Python切片(小知识点)
    FMCW部分资料连接
    Python基础25 异常堆栈跟踪,释放资源,自定义异常和主动抛出
  • 原文地址:https://www.cnblogs.com/smileberry/p/9327990.html
Copyright © 2011-2022 走看看