zoukankan      html  css  js  c++  java
  • 基元线程同步构造

    1,Volate.Write和Volate.Read:

     bool complete = false;
                var t = new Thread(() => { bool toggle = false;
                    while (!complete)
                    {
                        toggle = !toggle;
                    } });
                t.Start();
                Thread.Sleep(1000);
                complete = true;
                t.Join();
    
                Console.Read();

    注意:该代码会无限循环,(在Release优化模式下).使用Volatile就是解决这个问题的.Volatile大致时做了这么几件事情.VolateRead(Ref Value),首先,会进行一次内存读写(而非缓存),然后,在其后面的读写操作必须在其完成后再执行.

    Write操作:再执行时强制写入内存.并且那些再其之前的操作需要再Write之前发生.

    也就是说,Read优先在最前面.Write往往放在最后面.(这两个函数最要用于实时读取或者写入内存中的实际值.相当于PLC中的:P功能).

    2,InterLock(互锁构造).

      //Increment():原子性对一个32位整数进行+1操作.
    //Decrement:原子性的对一个32位整数进行-1操作.
    //Add(),原子性的对一个32位整数进行加减操作
    //Exchange(a,b)原子性的对一个32位整数进行赋值,并返回原来值
    //CompareExchange(a,value,b)如果 a==b,则 a=value,并且返回a;          
    

    利用异步模式进行多服务器请求的方式:

     internal sealed class MultiWebRequests
        {
            private AsyncCoordinator m_ac = new AsyncCoordinator();
    
            private Dictionary<string, object> m_servers = new Dictionary<string, object>
            {
    
                {"https://www.baidu.com",null },
                {"https://192.168.0.11",null }
    
            };
    
            public MultiWebRequests(int timeout=Timeout.Infinite)
            {
                var httpClient = new HttpClient();
                foreach(var server in m_servers.Keys)
                {
                    m_ac.AboutToBegin(1);
                    httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
                }
                m_ac.AllBegun(AllDone, timeout);
            }
    
            private void AllDone(CoordinationStatus status)
            {
                switch (status)
                {
                    case CoordinationStatus.Cancel:
                        Console.WriteLine("operation canceled");
                        break;
                    case CoordinationStatus.Timeout:
                        Console.WriteLine("Operation timed-out");
                        break;
                    case CoordinationStatus.AllDone:
                        Console.WriteLine("operation completed;results below:");
                        foreach(var server in m_servers)
                        {
                            Console.WriteLine("{0} ", server.Key);
                            object result = server.Value;
                            if(result is Exception)
                            {
                                Console.WriteLine("failed due to {0}", result.GetType().Name);
                            }
                            else
                            {
                                Console.WriteLine("returned {0:N0} bytes.", result);
                            }
    
                        }
                        break;
                }
    
            }
    
            private void ComputeResult(string server, Task<byte[]> task)
            {
                object result;
                if (task.Exception != null)
                {
                    result = task.Exception.InnerException;
                }
                else
                {
                    result = task.Result.Length;
                }
                //
                m_servers[server] = result;
                m_ac.JustEnded();
            }
            public void Cancel() { m_ac.Cancel(); }
        }
           enum CoordinationStatus { Cancel,Timeout,AllDone}
          internal sealed class AsyncCoordinator
        {
            private int m_opCount = 1;
            private int m_statusReported = 0;
            private Action<CoordinationStatus> m_callback;
            private Timer m_timer;
            //协定异步计数器的大小
            public void AboutToBegin(Int32 opsToAdd = 1)
            {
                Interlocked.Add(ref m_opCount, opsToAdd);
            }
            //判断是否异步计数器为0,如果为0,表明所有操作完成
            public void JustEnded()
            {
                if(Interlocked.Decrement(ref m_opCount)==0)
                    ReportStatus(CoordinationStatus.AllDone);
            }
            //当,所有异步操作执行时,执行该函数,用来启动计数器的判断模式.
            //当启动限时操作时,则传递委托执行当超时时,报告超时异常.
            public void AllBegun(Action<CoordinationStatus> callback,int timeout=Timeout.Infinite)
            {
                m_callback = callback;
                if (timeout != Timeout.Infinite)
                    m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
                JustEnded();
            }
            //超时时,则执行报告超时.
            private void TimeExpired(object state)
            {
                ReportStatus(CoordinationStatus.Timeout);
            }
            //调用cancel()时,则报告Cancel().
            public void Cancel() { ReportStatus(CoordinationStatus.Cancel); }
            //使用m_statusReorted,进行原子操作进行竞争.从而只让某个委托函数只执行一次.
            private void ReportStatus(CoordinationStatus status)
            {
                if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
                    m_callback(status);
            }
        }
    

    利用Task方法,在每次请求数据结束后进行数据处理(是否异常,如果不是异常则返回结果,如果是异常则返回异常).

    其次,利用协调类的m_OpCount来进行操作判断处理,判断是否所有操作处理完毕

    再次,利用竞争从而只形成一次报告生成.

    3,简单自旋锁:

    internal struct SimpleSpinLock
        {
            private int m_ResourceInUse;
            public void Enter()
            {
                while (Interlocked.Exchange(ref m_ResourceInUse, 1) != 0)
                {
                    //黑科技//简单自旋锁适用计算量不大的场合.
                }
            }
            public void Leave()
            {
                Volatile.Write(ref m_ResourceInUse, 0);
            }
        }
    • 1,利用Interlock进行原子操作保证
    • 2,使用Volatile保证内存写入一致.
    • 3,问题,可能形成
      Thread.Yeild
      该方法是在 .Net 4.0 中推出的新方法,它对应的底层方法是 SwitchToThread。Yield 的中文翻译为 “放弃”,这里意思是主动放弃当前线程的时间片,并让操作系统调度其它就绪态的线程使用一个时间片。但是如果调用 Yield,只是把当前线程放入到就绪队列中,而不是阻塞队列。如果没有找到其它就绪态的线程,则当前线程继续运行。
      优势:比 Thread.Sleep(0) 速度要快,可以让低于当前优先级的线程得以运行。可以通过返回值判断是否成功调度了其它线程。
      劣势:只能调度同一个处理器的线程,不能调度其它处理器的线程。当没有其它就绪的线程,会一直占用 CPU 时间片,造成 CPU 100%占用率
      Thread.Sleep(0)
      Sleep 的意思是告诉操作系统自己要休息 n 毫秒,这段时间就让给另一个就绪的线程吧。当 n=0 的时候,意思是要放弃自己剩下的时间片,但是仍然是就绪状态,其实意思和 Yield 有点类似。但是 Sleep(0) 只允许那些优先级相等或更高的线程使用当前的CPU,其它线程只能等着挨饿了。如果没有合适的线程,那当前线程会重新使用 CPU 时间片。
      优势:相比 Yield,可以调度任何处理器的线程使用时间片。
      劣势:只能调度优先级相等或更高的线程,意味着优先级低的线程很难获得时间片,很可能永远都调用不到。当没有符合条件的线程,会一直占用 CPU 时间片,造成 CPU 100%占用率。
      Thread.Sleep(1)
      该方法使用 1 作为参数,这会强制当前线程放弃剩下的时间片,并休息 1 毫秒(因为不是实时操作系统,时间无法保证精确,一般可能会滞后几毫秒或一个时间片)。但因此的好处是,所有其它就绪状态的线程都有机会竞争时间片,而不用在乎优先级。
      优势:可以调度任何处理器的线程使用时间片。无论有没有符合的线程,都会放弃 CPU 时间,因此 CPU 占用率较低。
      劣势:相比 Thread.Sleep(0),因为至少会休息一定时间,所以速度要更慢。
      Thread.Sleep(0) vs Sleep(1) vs Yeild

    4,InterLocked Anything

     public delegate Int32 Morpher<TResult, TArgument>(int startValue, TArgument argument, out TResult morphResult);
            //将一个委托方法变成原子操作方式:
            public static TResult Morph<TResult,TArgument>(ref int target,TArgument argument,Morpher<TResult,TArgument> morpher)
            {
                //方程需要的结果
                TResult morphResult;
                //进行操作,如果数据有变化就从新操作.
                int currentVal = target, startVal, desiredVal;
                do
                {
                    startVal = currentVal;
                    desiredVal = morpher(startVal, argument, out morphResult);
                    currentVal = Interlocked.CompareExchange(ref target, desiredVal, startVal);
                } while (startVal != currentVal);
                return morphResult;
            }
    

    这是一个原子方法的扩展版本.也就是用于包装一个原子方法,保证其被更新.

    这是一个实战的列子

    public static void go()
            {
                int count = 0;
                int m_count = 0;
                List<Task> tasks = new List<Task>();
                for(var i = 0; i < 3; i++)
                {
                    var t = Task.Run(() =>
                     {
                         for (var ii = 0; ii < 50000; ii++)
                         {
                             //对于count就是进行对其的原子操作.(使用Count__对其进行一次原子操作,在这里是+1)
                             //对于m_count,其是一次伴随操作.
                             m_count=Morph<int, int>(ref count, 1, Count__);
                         }
                     });
                    tasks.Add(t);
                }
                Task.WaitAll(tasks.ToArray());
                Console.WriteLine(count+" "+m_count);
            }
    
            private static int Count__(int startValue, int argument, out int morphResult)
            {
                var result = startValue + 1;
                morphResult = result;
                return result;
            }

    5,

    内核模式构造

    • WaitHandle
      • EventWaitHandle
        • AutoResetEvent//每次运行完WaitOne自动设定成Reset.
        • ManualResetEvent//需要手动设定成Reset.
      • Semaphore
      • Mutex

    WaitHandle用于包装一个内核对象句柄.其常用方法:

       WaitOne():当传递true的时候(Set())则不堵塞,否则则堵塞线程

                      :当返回true,表示调用了Set(),否则,当超时时,则返回False();

      Set():使堵塞的WaitOne()使能为可以运行.

      ReSet(): 使运行的WaitOne() 设置成堵塞.

    OpenExsiting(string str)可以获取已经创建的内核句柄.

    new

      一个交替打印1--100的方式:

     public static void go3()
            {
                var waiter1 = new AutoResetEvent(false);
                var waiter2 = new AutoResetEvent(false);
                var m_count = 1;
                Task.Run(() =>
                {
                    while (m_count <= 99)
                    {
                        //获取set方法或者超时,如果超时返回false,否则返回true,表示set()方法打通.
                        var wt = waiter1.WaitOne(100);
                        Console.WriteLine("thread1 is " + (m_count++) + "waiter is" + wt);
                        waiter2.Set();
    
                    }
                });
                Task.Run(() =>
                {
    
                    while (m_count <= 100)
                    {
                        var wt= waiter2.WaitOne(100);
                        Console.WriteLine("thread2 is " + (m_count++)+"waiter is"+wt);
                        waiter1.Set();
    
                    }
                });
            }

    Semaphore构造: 其实是内核维护的Int32变量.当=0的时候,在信号等待的线程阻塞,不为0的时候,解除阻塞.

      构建一个Semaphore对象  New Semaphore(0,1,”name”,ref bool create_new):

       创建一个独一无二的内核对象"name"如果是本线程创建,则create_new=true,否则 false;

       当使用WaitOne()方法的时候,如果 Semaphore对象内部管理的信号量=0,则堵塞线程,否则 不堵塞.

       每使用WaitOne()方法一次,就将信号量减1.

       每使用release(count)就使信号量+count.

       和AutoResetEvent不同的地方在于,多次Set(),也只会使一个线程恢复非堵塞,而Semaphore可能会造成释放多个线程.

       利用AutoReset和Semaphore都可以进行自旋锁的构造.

    public class SimpleLock1:IDisposable
            {
                private Semaphore m_lock = new Semaphore(1, 1);
    
                public void Enter()
                {
                    m_lock.WaitOne();
                }
                public void Leave()
                {
                    m_lock.Release();
                }
                public void Dispose() { m_lock.Close(); }
            }

    Mutex:

    互诉锁线程使用Mutex.WaitOne()方法等待C# Mutex对象被释放,如果它等待的C# Mutex对象被释放了,它就自动拥有这个对象,直到它调用Mutex.ReleaseMutex()方法释放这个对象,而在此期间,其他想要获取这个C# Mutex对象的线程都只有等待

    简单点说,这个锁监视当前线程:使用WaitOne()的方法,在运行后,会使其当前线程监视值+1,使用Release()方法,会使监视值-1,当,监视值=0的时候,才可以由其他线程获取使用权.(一般避免使用这个锁,而改用自定义的递归锁(对象递归).

    public static void go6()
            {
                //参数指示当前运行的线程是否获取Mutex锁,如果选择true的话,则需要释放一次,否则别的线程无法获取.
                var m_lock = new Mutex(false);
                var m_count = 1;
                //注意递归的用法.必须当整个线程运行完毕后,其他的线程才能够获得锁的所有权才会运行下一步.
                Task.Run(() =>
                {
                    //如果没线程在用就获取锁.
    
                    while (m_count <= 100)
                    {
                        if (m_count % 2 == 1)
                        {
                            m_lock.WaitOne();
                            Console.WriteLine("Thread{0} is Print {1}", Thread.CurrentThread.ManagedThreadId, m_count++);
                            m_lock.ReleaseMutex();
                        }
                    }
    
                });
                Task.Run(() =>
                {
                    while (m_count <= 100)
                    {
                        if (m_count % 2 == 0)
                        {
                            m_lock.WaitOne();
                            Console.WriteLine("Thread{0} is Print {1}", Thread.CurrentThread.ManagedThreadId, m_count++);
                            m_lock.ReleaseMutex();
                        }
                    }
                });
    
            }

    模拟递归锁

    public sealed class RecursiveAutoResetEvent:IDisposable
        {
            private AutoResetEvent m_lock = new AutoResetEvent(true);
            private int m_owningThreadId = 0;
            private int m_recursionCount = 0;
            public void Enter()
            {
                Int32 currentThreadId = Thread.CurrentThread.ManagedThreadId;
                //如果当前线程是原进程
                if(currentThreadId==m_owningThreadId)
                {
                    m_recursionCount++;
                    return;
                }
                //当前进程是第一次进入
                m_lock.WaitOne();
                //当前线程第一次拥有锁
                m_owningThreadId = currentThreadId;
                m_recursionCount = 1;
            }
    
            public void Leave()
            {
                Int32 currentThreadId = Thread.CurrentThread.ManagedThreadId;
                if (currentThreadId != m_owningThreadId || m_recursionCount < 1) throw new InvalidOperationException("error thread");
               if(--m_recursionCount==0)
                {
                    m_owningThreadId = 0;
                    m_lock.Set();
    
                }
    
    
            }
            public void Dispose() { m_lock.Dispose(); }
        }
    
  • 相关阅读:
    遇见Javascript类型数组
    编译Android常用命令
    V4L2驱动视频开发要点
    Windows Phone开发(27):隔离存储A
    Ubuntu10.04下Android开发环境搭建
    V4L2开发要点
    使用 php Header 报错的一个原因
    Windows Phone开发(28):隔离存储B
    用HTML5 Audio API开发游戏音乐
    php备份数据库类分享
  • 原文地址:https://www.cnblogs.com/frogkiller/p/12554292.html
Copyright © 2011-2022 走看看