zoukankan      html  css  js  c++  java
  • 使用Interlocked在多线程下进行原子操作,无锁无阻塞的实现线程运行状态判断

    巧妙地使用Interlocked的各个方法,再无锁无阻塞的情况下判断出所有线程的运行完成状态。

    昨晚耐着性子看完了clr via c#的第29章<<基元线程同步构造>>,尽管这本书不是第一次看了,但是之前看的都是一带而过,没有深入理解,甚至可以说是不理解,实习了之后发现自己的知识原来这么表面,很多的实现都不能做出来,这很大程度上打击了我,而且,春招也快来了,更需要打扎实基础。引起我注意的是jeffrey在第29章说的:使用Interlocked,代码很短,绝不阻塞任何线程,二期使用线程池线程来实现自动伸缩。下载了源码,然后分析了下书中的示例,code如下:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Net.Http;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace vlr_via_cs
    {
        internal static class AsyncCoordinatorDemo
        {
            public static void Go()
            {
                const Int32 timeout = 50000;   // Change to desired timeout
                MultiWebRequests act = new MultiWebRequests(timeout);
                Console.WriteLine("All operations initiated (Timeout={0}). Hit <Enter> to cancel.",
                   (timeout == Timeout.Infinite) ? "Infinite" : (timeout.ToString() + "ms"));
                Console.ReadLine();
                act.Cancel();
    
                Console.WriteLine();
                Console.WriteLine("Hit enter to terminate.");
                Console.ReadLine();
            }
    
            private sealed class MultiWebRequests
            {
                // This helper class coordinates all the asynchronous operations
                private AsyncCoordinator m_ac = new AsyncCoordinator();
    
                // Set of Web servers we want to query & their responses (Exception or Int32)
                private Dictionary<String, Object> m_servers = new Dictionary<String, Object> {
                    { "http://cjjjs.com/", null },
                    { "http://cnblogs.com/", null },
                    { "http://www.jobbole.com/", null }
                };
    
                public MultiWebRequests(Int32 timeout = Timeout.Infinite)
                {
                    // Asynchronously initiate all the requests all at once
                    var httpClient = new HttpClient();
                    foreach (var server in m_servers.Keys)
                    {
                        m_ac.AboutToBegin(1); //确保先做三次加法, 若是有Sleep,在调用完这个函数后,执行
                        httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
                    }
    
                    // Tell AsyncCoordinator that all operations have been initiated and to call
                    // AllDone when all operations complete, Cancel is called, or the timeout occurs
                    m_ac.AllBegun(AllDone, timeout);
                }
    
                private void ComputeResult(String server, Task<Byte[]> task)
                {
                    Object result;
                    if (task.Exception != null)
                    {
                        result = task.Exception.InnerException;
                    }
                    else
                    {
                        // Process I/O completion here on thread pool thread(s)
                        // Put your own compute-intensive algorithm here...
                        result = task.Result.Length;   // This example just returns the length
                    }
    
                    // Save result (exception/sum) and indicate that 1 operation completed
                    m_servers[server] = result;
                    m_ac.JustEnded();
                }
    
                // Calling this method indicates that the results don't matter anymore
                public void Cancel() { m_ac.Cancel(); }
    
                // This method is called after all Web servers respond, 
                // Cancel is called, or the timeout occurs
                private void AllDone(CoordinationStatus status)
                {
                    switch (status)
                    {
                        case CoordinationStatus.Cancel:
                            Console.WriteLine("Operation canceled.");
                            break;
    
                        case CoordinationStatus.Timeout:
                            Console.WriteLine("Operation timed-out.");
                            break;
    
                        case CoordinationStatus.AllDone:
                            Console.WriteLine("Operation completed; results below:");
                            foreach (var server in m_servers)
                            {
                                Console.Write("{0} ", server.Key);
                                Object result = server.Value;
                                if (result is Exception)
                                {
                                    Console.WriteLine("failed due to {0}.", result.GetType().Name);
                                }
                                else
                                {
                                    Console.WriteLine("returned {0:N0} bytes.", result);
                                }
                            }
                            break;
                    }
                }
            }
    
            private enum CoordinationStatus
            {
                AllDone,
                Timeout,
                Cancel
            };
    
            private sealed class AsyncCoordinator
            {
                private Int32 m_opCount = 1;        // Decremented when AllBegun calls JustEnded
                private Int32 m_statusReported = 0; // 0=false, 1=true
                private Action<CoordinationStatus> m_callback;
                private Timer m_timer;
    
                // This method MUST be called BEFORE initiating an operation
                public void AboutToBegin(Int32 opsToAdd = 1)
                {
                    Interlocked.Add(ref m_opCount, opsToAdd);
                }
    
                // This method MUST be called AFTER an operations result has been processed
                public void JustEnded()
                {
                    if (Interlocked.Decrement(ref m_opCount) == 0)
                        ReportStatus(CoordinationStatus.AllDone);
                }
    
                // This method MUST be called AFTER initiating ALL operations
                public void AllBegun(Action<CoordinationStatus> callback, Int32 timeout = Timeout.Infinite)
                {
                    m_callback = callback;
                    if (timeout != Timeout.Infinite)
                    {
                        // 在指定的时间点(dueTime) 调用回调函数,随后在指定的时间间隔(period)调用回调函数
                        m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
                    }
                    JustEnded();
                }
    
                // 处理过时的线程
                private void TimeExpired(Object o) {
                    ReportStatus(CoordinationStatus.Timeout);
                }
    
                public void Cancel()
                {
                    if (m_callback == null)
                        throw new InvalidOperationException("Cancel cannot be called before AllBegun");
                    ReportStatus(CoordinationStatus.Cancel);
                }
    
                private void ReportStatus(CoordinationStatus status)
                {
                    if (m_timer != null)
                    {  // If timer is still in play, kill it
                        Timer timer = Interlocked.Exchange(ref m_timer, null);
                        if (timer != null) timer.Dispose();
                    }
    
                    // If status has never been reported, report it; else ignore it
                    if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
                        m_callback(status);
                }
            }
        }
    
    
        class Program
        {
            static void Main(string[] args)
            {
                AsyncCoordinatorDemo.Go();
    
                Console.Read();
            }
        }
    }

    的确是无锁的操作,Interlocked方法是用户模式下的原子操作,针对的是CPU,不是线程内存,而且它是自旋等待的,耗费的是CPU资源。分析了下AsyncCoordinator类,主要就是利用Interlocked的Add方法,实时计数线程的数量,随后待一个线程运行的最后又调用Interlocked的Decrement方法自减。如果你留心的话,你会发现,目前绝大多数的并发判断中都用到了Interlocked的这些方法,尤其是interlocked的anything模式下的compareexchange方法,在这里提一嘴,除了compareexchange和exchange方法的返回值是返回ref类型原先的值之外,其余的方法都是返回改变之后的值。最后我们可以通过AllBegun方法来判断是不是所有的线程都执行完了,随后将状态变量m_statusReported设置为1,防止在进行状态判断。

    这个类很好,之前写并发的时候,老是烦恼怎么判断并发是否已经完事了,又不想用到阻塞,这个类很好,当然应用到具体项目中可能还需要改,但是基本的模型还是这个,不变的。

    有点感慨:好东西需要我们自己去发掘,之前查生产者消费者模型的时候,java代码一大堆,愣是没有看到几个C#,就算有也是简易,尽管可以把java的改变为C#的,但有点感慨C#的技术栈和资源少

  • 相关阅读:
    IOS Charles(代理服务器软件,可以用来拦截网络请求)
    Javascript中addEventListener和attachEvent的区别
    MVC中实现Area几种方法
    Entity Framework Code First 中使用 Fluent API 笔记。
    自定义JsonResult解决 序列化类型 System.Data.Entity.DynamicProxies 的对象时检测到循环引用
    序列化类型 System.Data.Entity.DynamicProxies 的对象时检测到循环引用
    An entity object cannot be referenced by multiple instances of IEntityChangeTracker 的解决方案
    Code First :使用Entity. Framework编程(8) ----转发 收藏
    Code First :使用Entity. Framework编程(6) ----转发 收藏
    Code First :使用Entity. Framework编程(5) ----转发 收藏
  • 原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8352848.html
Copyright © 2011-2022 走看看