zoukankan      html  css  js  c++  java
  • 使用Interlocked在多线程下进行原子操作,无锁无阻塞的实现线程运行状态判断

    巧妙地使用Interlocked的各个方法,再无锁无阻塞的情况下判断出所有线程的运行完成状态。

    昨晚耐着性子看完了clr via c#的第29章<<基元线程同步构造>>,尽管这本书不是第一次看了,但是之前看的都是一带而过,没有深入理解,甚至可以说是不理解,实习了之后发现自己的知识原来这么表面,很多的实现都不能做出来,这很大程度上打击了我,而且,春招也快来了,更需要打扎实基础。引起我注意的是jeffrey在第29章说的:使用Interlocked,代码很短,绝不阻塞任何线程,二期使用线程池线程来实现自动伸缩。下载了源码,然后分析了下书中的示例,code如下:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Net.Http;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace vlr_via_cs
    {
        internal static class AsyncCoordinatorDemo
        {
            public static void Go()
            {
                const Int32 timeout = 50000;   // Change to desired timeout
                MultiWebRequests act = new MultiWebRequests(timeout);
                Console.WriteLine("All operations initiated (Timeout={0}). Hit <Enter> to cancel.",
                   (timeout == Timeout.Infinite) ? "Infinite" : (timeout.ToString() + "ms"));
                Console.ReadLine();
                act.Cancel();
    
                Console.WriteLine();
                Console.WriteLine("Hit enter to terminate.");
                Console.ReadLine();
            }
    
            private sealed class MultiWebRequests
            {
                // This helper class coordinates all the asynchronous operations
                private AsyncCoordinator m_ac = new AsyncCoordinator();
    
                // Set of Web servers we want to query & their responses (Exception or Int32)
                private Dictionary<String, Object> m_servers = new Dictionary<String, Object> {
                    { "http://cjjjs.com/", null },
                    { "http://cnblogs.com/", null },
                    { "http://www.jobbole.com/", null }
                };
    
                public MultiWebRequests(Int32 timeout = Timeout.Infinite)
                {
                    // Asynchronously initiate all the requests all at once
                    var httpClient = new HttpClient();
                    foreach (var server in m_servers.Keys)
                    {
                        m_ac.AboutToBegin(1); //确保先做三次加法, 若是有Sleep,在调用完这个函数后,执行
                        httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
                    }
    
                    // Tell AsyncCoordinator that all operations have been initiated and to call
                    // AllDone when all operations complete, Cancel is called, or the timeout occurs
                    m_ac.AllBegun(AllDone, timeout);
                }
    
                private void ComputeResult(String server, Task<Byte[]> task)
                {
                    Object result;
                    if (task.Exception != null)
                    {
                        result = task.Exception.InnerException;
                    }
                    else
                    {
                        // Process I/O completion here on thread pool thread(s)
                        // Put your own compute-intensive algorithm here...
                        result = task.Result.Length;   // This example just returns the length
                    }
    
                    // Save result (exception/sum) and indicate that 1 operation completed
                    m_servers[server] = result;
                    m_ac.JustEnded();
                }
    
                // Calling this method indicates that the results don't matter anymore
                public void Cancel() { m_ac.Cancel(); }
    
                // This method is called after all Web servers respond, 
                // Cancel is called, or the timeout occurs
                private void AllDone(CoordinationStatus status)
                {
                    switch (status)
                    {
                        case CoordinationStatus.Cancel:
                            Console.WriteLine("Operation canceled.");
                            break;
    
                        case CoordinationStatus.Timeout:
                            Console.WriteLine("Operation timed-out.");
                            break;
    
                        case CoordinationStatus.AllDone:
                            Console.WriteLine("Operation completed; results below:");
                            foreach (var server in m_servers)
                            {
                                Console.Write("{0} ", server.Key);
                                Object result = server.Value;
                                if (result is Exception)
                                {
                                    Console.WriteLine("failed due to {0}.", result.GetType().Name);
                                }
                                else
                                {
                                    Console.WriteLine("returned {0:N0} bytes.", result);
                                }
                            }
                            break;
                    }
                }
            }
    
            private enum CoordinationStatus
            {
                AllDone,
                Timeout,
                Cancel
            };
    
            private sealed class AsyncCoordinator
            {
                private Int32 m_opCount = 1;        // Decremented when AllBegun calls JustEnded
                private Int32 m_statusReported = 0; // 0=false, 1=true
                private Action<CoordinationStatus> m_callback;
                private Timer m_timer;
    
                // This method MUST be called BEFORE initiating an operation
                public void AboutToBegin(Int32 opsToAdd = 1)
                {
                    Interlocked.Add(ref m_opCount, opsToAdd);
                }
    
                // This method MUST be called AFTER an operations result has been processed
                public void JustEnded()
                {
                    if (Interlocked.Decrement(ref m_opCount) == 0)
                        ReportStatus(CoordinationStatus.AllDone);
                }
    
                // This method MUST be called AFTER initiating ALL operations
                public void AllBegun(Action<CoordinationStatus> callback, Int32 timeout = Timeout.Infinite)
                {
                    m_callback = callback;
                    if (timeout != Timeout.Infinite)
                    {
                        // 在指定的时间点(dueTime) 调用回调函数,随后在指定的时间间隔(period)调用回调函数
                        m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
                    }
                    JustEnded();
                }
    
                // 处理过时的线程
                private void TimeExpired(Object o) {
                    ReportStatus(CoordinationStatus.Timeout);
                }
    
                public void Cancel()
                {
                    if (m_callback == null)
                        throw new InvalidOperationException("Cancel cannot be called before AllBegun");
                    ReportStatus(CoordinationStatus.Cancel);
                }
    
                private void ReportStatus(CoordinationStatus status)
                {
                    if (m_timer != null)
                    {  // If timer is still in play, kill it
                        Timer timer = Interlocked.Exchange(ref m_timer, null);
                        if (timer != null) timer.Dispose();
                    }
    
                    // If status has never been reported, report it; else ignore it
                    if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
                        m_callback(status);
                }
            }
        }
    
    
        class Program
        {
            static void Main(string[] args)
            {
                AsyncCoordinatorDemo.Go();
    
                Console.Read();
            }
        }
    }

    的确是无锁的操作,Interlocked方法是用户模式下的原子操作,针对的是CPU,不是线程内存,而且它是自旋等待的,耗费的是CPU资源。分析了下AsyncCoordinator类,主要就是利用Interlocked的Add方法,实时计数线程的数量,随后待一个线程运行的最后又调用Interlocked的Decrement方法自减。如果你留心的话,你会发现,目前绝大多数的并发判断中都用到了Interlocked的这些方法,尤其是interlocked的anything模式下的compareexchange方法,在这里提一嘴,除了compareexchange和exchange方法的返回值是返回ref类型原先的值之外,其余的方法都是返回改变之后的值。最后我们可以通过AllBegun方法来判断是不是所有的线程都执行完了,随后将状态变量m_statusReported设置为1,防止在进行状态判断。

    这个类很好,之前写并发的时候,老是烦恼怎么判断并发是否已经完事了,又不想用到阻塞,这个类很好,当然应用到具体项目中可能还需要改,但是基本的模型还是这个,不变的。

    有点感慨:好东西需要我们自己去发掘,之前查生产者消费者模型的时候,java代码一大堆,愣是没有看到几个C#,就算有也是简易,尽管可以把java的改变为C#的,但有点感慨C#的技术栈和资源少

  • 相关阅读:
    Linux unalias命令 取消别名
    linux cp 拷贝文件或目录
    POJ 1850
    POJ 1844
    POJ 1852
    POJ 1837
    POJ 1833
    POJ 1804
    POJ 1789
    POJ 1781
  • 原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8352848.html
Copyright © 2011-2022 走看看