zoukankan      html  css  js  c++  java
  • C# 线程、任务和同步


    1,线程概述
    线程是程序汇中独立的指令流。线程有一个优先级,实际上正在处理的程序的位置计数器,一个存储其局部变量的栈。每个线程都有自己的栈。但应用程序的内存和堆由一个进程的所有线程共享。
    进程包含资源,如windows句柄,文件句柄或其他内核对象。每个进程都分配了虚拟内存。一个进程至少包含一个线程。操作系统会调度线程。


    总结:
    同步代码区域(代码块):lock,  Monitor, SpinLock, Mutex,WaitHandle,Semaphore,EventWaitHandle,AutoRestEvent/ManualResetEvent.
    Barrier, ReadWriterLock(Slim)
    多线程变量同步:InterLocked, 



    进程间同步:
    Mutex, Semaphore,



    2,异步委托:
    创建线程的一种简单方式是定义一个委托,并异步调用它。委托时方法类型安全的引用。Delegate类还支持异步调用委托,在后头创建一个执行任务的线程。
      委托使用线程池来完成异步调用。
      public delegate int TakesAWhileDelegate(int data, int ms);
    2.1投票:
      IAsyncResult ar=al.BeginInvoke(1,3000, null, null);
          int result=dl.EndInvoke(ar);
    2.2 等待句柄  (WaitHandle)

     1     class Program
     2     {
     3         public delegate int TakesAWhileDelegate(int data, int ms);
     4         static int TakesAWhile(int data, int ms)
     5         {
     6             Console.WriteLine("TakesAWhile started");
     7             Thread.Sleep(ms);
     8             Console.WriteLine("TakesAWhile completed");
     9             return ++data;
    10         }
    11         private static void Main(string[] args)
    12         {
    13             Console.WriteLine("Main Begin.");
    14             TakesAWhileDelegate dl = TakesAWhile;
    15             IAsyncResult ar = dl.BeginInvoke(2, 3000, null, null);
    16 
    17             //ar.IsCompleted
    18             //ar.AsyncWaitHandle.WaitOne(50)
    19 
    20             dl.EndInvoke(ar);
    21             Console.WriteLine("Main() end.");
    22             Console.ReadLine();
    23         }
    24     }
    投票(ar.IsCompleted) 或者等待句柄(ar.AsyncWaitHandle.WaitOne(50, false)

    2.3 异步回调 (dl.BeginInvoke(1,3000, TakesAWhileCompleted, dl) )
    传入一个回调函数委托,来异步执行。

     1     class Program
     2     {
     3         public delegate int TakesAWhileDelegate(int data, int ms);
     4         static int TakesAWhile(int data, int ms)
     5         {
     6             Console.WriteLine("TakesAWhile started");
     7             Thread.Sleep(ms);
     8             Console.WriteLine("TakesAWhile completed");
     9             return ++data;
    10         }
    11         private static void Main(string[] args)
    12         {
    13             Console.WriteLine("Main Begin.");
    14             TakesAWhileDelegate dl = TakesAWhile;
    15             dl.BeginInvoke(2, 3000, ar =>
    16             {
    17                 if (ar == null)
    18                     throw new ArgumentNullException("ar");
    19                 TakesAWhileDelegate dl1 = ar.AsyncState as TakesAWhileDelegate;
    20                 Trace.Assert(dl1 != null, "Invalid object type");
    21                 int result = dl1.EndInvoke(ar);
    22                 Console.WriteLine("result: {0}", result);
    23             }, null);
    24 
    25             Console.WriteLine("Main() end.");
    26             Console.ReadLine();
    27         }
    28     }
    回调方法


    3,Thread类
    3.1 给线程传递数据
      1,使用带ParameterizedThreadStart委托参数的Thread构造函数。2,创建自定义类,把线程的方法定位实例方法,这样就可以初始化实例的数据,之后启动线程。
    3.2 后台线程:
      只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程依然是激活的,直到所有前台线程完成其任务为止。
      

            private static void Main(string[] args)
            {
                var t1 = new Thread(ThreadMain) {Name = "MyNewThread", IsBackground = false};
                t1.Start();
                Console.WriteLine("Main Thread ending now.");
    
            }
    
            static void ThreadMain()
            {
                Console.WriteLine("Thread {0} started", Thread.CurrentThread.Name);
                Thread.Sleep(3000);
                Console.WriteLine("Thread {0} Completed",Thread.CurrentThread.Name);
            }
    前台线程示例

    3.3 线程的优先级 

    4 线程池

    5,任务
    5.1 启动任务:
      

    启动任务代码
                TaskFactory tf = new TaskFactory();
                Task t1 = tf.StartNew(TaskMethod);
    
                Task t2 = Task.Factory.StartNew(TaskMethod);
    
                Task t3 = new Task(TaskMethod);
                t3.Start();

     5.2 连续的任务
    Task t1=new Task(DoOnFirst);
    Task t2=t1.ContinueWith(DoOnSecond);

    5.3任务层次结构


    6 Parallel 类
    Parallel.For
    Parallel.ForEach()
    Parallel.Invoke(fun1,fun2);

     1         private static void Main(string[] args)
     2         {
     3             Parallel.Invoke(TaskMethod1,TaskMethod2);
     4 
     5             Console.ReadLine();
     6         }
     7 
     8         static void TaskMethod1()
     9         {
    10             Console.WriteLine("1running in a task.");
    11             Console.WriteLine("Task id: {0}", Task.CurrentId);
    12         }
    13         static void TaskMethod2()
    14         {
    15             Console.WriteLine("2running in a task.");
    16             Console.WriteLine("Task id: {0}", Task.CurrentId);
    17         }
    Parallel.Invoke Code

    7. 取消架构

    8, 线程问题:  争用条件和死锁
    8.1 争用条件:

     1         public class StateObject
     2         {
     3             private int state = 5;
     4 
     5             public void ChangeState(int loop)
     6             {
     7                 lock (this)
     8                 {
     9                     if (state == 5)
    10                     {
    11                         state++;
    12                         Trace.Assert(state == 6, "Race condition ocurred after " + loop + " loops  " + Task.CurrentId);
    13                         if (loop % 1000000 == 0)
    14                         {
    15                             Console.WriteLine("after " + loop + " loops  " + Task.CurrentId);
    16                         }
    17                     }
    18                     state = 5;
    19                 }
    20 
    21             }
    22         }
    23 
    24         public class SampleTask
    25         {
    26             public void RaceCondition(object o)
    27             {
    28                 Trace.Assert(o is StateObject, "o must be of type StateObject.");
    29                 StateObject state = o as StateObject;
    30                 int i = 0;
    31                 while (true)
    32                 {
    33                     state.ChangeState(i++);
    34                 }
    35             }
    36         }
    37 
    38         public class SampleThread
    39         {
    40             public SampleThread(StateObject s1, StateObject s2)
    41             {
    42                 this.s1 = s1;
    43                 this.s2 = s2;
    44             }
    45 
    46             private StateObject s1;
    47             private StateObject s2;
    48 
    49             public void Deadlock1()
    50             {
    51                 int i = 0;
    52                 while (true)
    53                 {
    54                     lock (s1)
    55                     {
    56                         lock (s2)
    57                         {
    58                             s1.ChangeState(i);
    59                             s2.ChangeState(i++);
    60                             Console.WriteLine("still running, {0}", i);
    61 
    62                         }
    63                     }
    64                     //Thread.Yield();
    65                 }
    66             }
    67             public void Deadlock2()
    68             {
    69                 int i = 0;
    70                 while (true)
    71                 {
    72                     lock (s2)
    73                     {
    74                         lock (s1)
    75                         {
    76                             s1.ChangeState(i);
    77                             s2.ChangeState(i++);
    78                             Console.WriteLine("still running, {0}", i);
    79 
    80                         }
    81                     }
    82                     //Thread.Yield();
    83                 }
    84             }
    85         }
    86 
    87         private static void Main(string[] args)
    88         {
    89             var state1 = new StateObject();
    90             var state2 = new StateObject();
    91 
    92             SampleThread st = new SampleThread(state1, state2);
    93 
    94             Task.Factory.StartNew(st.Deadlock1);
    95             Task.Factory.StartNew(st.Deadlock2);
    96 
    97             Console.ReadLine();
    98         }
    死锁演示代码


    9, 同步
    9.1 Lock 语句
    栈是线程独立的,但不是私有的。所有线程的栈内所有内容,都可以被其他线程访问。
    为什么不用 lock(this) ?
    因为这通常超出我们的控制,因为其他人也有可能lock这个对象。一个私有的对象是更好的选择。避免lock一个公开类型,或者超出你代码的控制的实例。
    Tips:可以提供线程安全的原子操作。

     1     class Program
     2     {
     3         public class SharedState
     4         {
     5             public int State { get; set; }
     6         }
     7 
     8         public class Job
     9         {
    10             private SharedState sharedState;
    11 
    12             public Job(SharedState sharedState)
    13             {
    14                 this.sharedState = sharedState;
    15             }
    16 
    17             public void DoTheJob()
    18             {
    19                 for (int i = 0; i < 50000; i++)
    20                 {
    21                     sharedState.State +=1;
    22                 }
    23             }
    24         }
    25 
    26         private static void Main(string[] args)
    27         {
    28             int numTasks = 20
    29                 ;
    30             var state = new SharedState();
    31             var tasks = new Task[numTasks];
    32             for (int j = 0; j < 5; j++)
    33             {
    34                 state.State = 0;
    35                 for (int i = 0; i < numTasks; i++)
    36                 {
    37                     tasks[i] = new Task(new Job(state).DoTheJob);
    38                     tasks[i].Start();
    39 
    40                 }
    41 
    42                 for (int i = 0; i < numTasks; i++)
    43                 {
    44                     tasks[i].Wait();
    45 
    46                 }
    47                 Console.WriteLine("summarized {0}", state.State);
    48             }
    49 
    50 
    51         }
    52 
    53     }
    线程不安全-问题代码

    9.2 Interlocked类
    Interlock类用于使变量的简单语句原子化,线程安全方式递增、递减、交换和读取。i++不是线程安全的(包含3个操作:从内存获取、递增1、存储回内存,这些操作都可以被线程调度器打断)。

    9.3 Monitor类
    lock语句由编译器解释为Monitor类: Moniter.Enter(obj) ;   Monitor.Exit(obj);
    Monitor类的一个优点:可以添加一个等待被锁定的超市值。Monitor.TryEnter(lockObj,500,ref lockToken);

     1             public void DoTheJob()
     2             {
     3                 for (int i = 0; i < 50000; i++)
     4                 {
     5                     bool isLocked = false;
     6                     goLabel:
     7                     Monitor.TryEnter(sharedState, 500, ref isLocked);
     8                     if (isLocked)
     9                     {
    10                         sharedState.State += 1;
    11                         Monitor.Exit(sharedState);
    12                     }
    13                     else
    14                     {
    15                         Console.WriteLine("lock failed.");
    16                         goto goLabel;
    17                     }
    18 
    19                 }
    20             }
    Monitor.TryEnter

     9.4 SpinLock结构
    适合于有大量的锁定,而且锁定的时间非常短。用法非常接近于Monitor类。获得锁使用Enter()或者TryEnter(),释放锁使用Exit()方法。小心SpinLock的传送,因为是结构,所以会复制。


    9.5 WaitHandle基类
    Delegate BeginInvoke() 用waithandle.WaitOne(50,false)来bolck当前线程,
    WaitHandle是一个抽象基类,用于等待一个信号量的设置。可以等待不同的信号,因为WaitHandle是一个基类,可以派生一些类。

            private static void Main(string[] args)
            {
                Action ac = () =>
                {
                    Console.WriteLine("Action Begin.");
                    Thread.Sleep(2000);
                    Console.WriteLine("Action End.");
    
                };
    
                AsyncCallback callback = (o) =>
                {
                    var cb = (Action)o.AsyncState;
                    cb.EndInvoke(o);
                    Console.WriteLine("Callback finished.");
    
                };
    
                IAsyncResult ar = ac.BeginInvoke(callback, ac);
                while (true)
                {
                    Console.Write(".");
                    if (ar.AsyncWaitHandle.WaitOne(50, true))
                    {
                        Console.WriteLine("Can get the result now.");
                        break;
                    }
                }
    
    
                Console.ReadLine();
    
            }
    AsyncWaitHandle

    WaitOne() 等待一个,waitAll()等待多个对象,WaitAny等待多个对象的一个。WaitAll和WaitAny是静态方法。

    WaitHandle基类有一个SafeWaitHandle属性,其中可以将本机句柄赋予一个操作系统资源,并等待该句柄。
    Mutex、EventWaitHandle 和 Semaphore类继承自WaitHandle基类。所以可以等到使用它们。

    9.6 Mutex类
    Mutex(mutual exclusion,互斥)是.net Framework中提供多个集成同步访问的一个类。它非常类似于Monitor,只有一个线程能拥有锁定。只有一个线程能获得互斥锁定,访问受互斥访问的同步代码区域。
        在Mutex类的构造函数中,可以指定互斥是否最初由主调线程拥有。定义互斥的名称,获得互斥是否存在的信息。
    系统能识别有名称的Mutex

            private static void Main(string[] args)
            {
                bool isNew;
                using (Mutex mutex = new Mutex(false, "ProMutext", out isNew))
                {
                    if (isNew)
                    {
                        Console.WriteLine("Get mutex lock.");
    
                    }
                    else
                    {
                        Console.WriteLine("can't get mutex lock.");
                    }
                    Thread.Sleep(3000);
                }
                Thread.Sleep(1000000);
                Console.ReadLine();
    
            }
    Mutex

    9.7 Semaphore类
    信号量非常类似于互斥,其区别是多个线程使用。信号量是一种技术的互斥锁定。使用信号量可以定义同时访问旗语锁定保护的资源的线程个数。
    Semaphore类:可以命名,使用系统范围内的资源,允许不同进程间同步。

            static void Main()
            {
                int threadCount = 6;
                int semaphoreCount = 4;
                var semaphore = new Semaphore( semaphoreCount, semaphoreCount,"ProSemaphore");
                var threads = new Thread[threadCount];
    
                for (int i = 0; i < threadCount; i++)
                {
                    threads[i] = new Thread(ThreadMain);
                    threads[i].Start(semaphore);
                }
    
                for (int i = 0; i < threadCount; i++)
                {
                    threads[i].Join();
                }
                Console.WriteLine("All threads finished");
    
            }
    
            static void ThreadMain(object o)
            {
                Semaphore semaphore = o as Semaphore;
                Trace.Assert(semaphore != null, "o must be a Semaphore type");
                bool isCompleted = false;
                while (!isCompleted)
                {
                    if (semaphore.WaitOne(600))
                    {
                        try
                        {
                            Console.WriteLine("Thread {0} locks the semaphore",
                                  Thread.CurrentThread.ManagedThreadId);
                            Thread.Sleep(4000);
                        }
                        finally
                        {
                            semaphore.Release();
                            Console.WriteLine("Thread {0} releases the semaphore",
                               Thread.CurrentThread.ManagedThreadId);
                            isCompleted = true;
                        }
                    }
                    else
                    {
                        Console.WriteLine("Timeout for thread {0}; wait again",
                           Thread.CurrentThread.ManagedThreadId);
                    }
                }
            }
    Semaphore(多线程&跨进程同步)

    SemaphoreSlim类是对于较短等待时间进行了优化的轻型版本,不能跨进程。不能命名,不使用内核信号量,不能跨进程。

            private static void Main(string[] args)
            {
                int threadCount = 6;
                int semaphoreCount = 4;
                var semaphore = new SemaphoreSlim(semaphoreCount, semaphoreCount);
                Thread[] threads = new Thread[threadCount];
    
                for (int i = 0; i < threadCount; i++)
                {
                    threads[i] = new Thread(ThreadMain);
                    threads[i].Start(semaphore);
                }
    
                for (int i = 0; i < threadCount; i++)
                {
                    threads[i].Join();
                }
                Console.WriteLine("AllThread finished!");
    
                Console.ReadLine();
            }
    
            static void ThreadMain(object o)
            {
                SemaphoreSlim semaphore = o as SemaphoreSlim;
                Trace.Assert(semaphore != null, "o must be a Semphore type.");
                bool isCompleted = false;
                while (!isCompleted)
                {
                    if (semaphore.Wait(100))
                    {
                        try
                        {
                            Console.WriteLine("thread {0} locks the semaphore", Thread.CurrentThread.ManagedThreadId);
                            Thread.Sleep(3000);
                        }
                        finally
                        {
                            semaphore.Release();
                            Console.WriteLine("Thread {0} release the semaphore", Thread.CurrentThread.ManagedThreadId);
                            isCompleted = true;
                        }
                    }
                    else
                    {
                        Console.WriteLine("Timeout for thread {0}; wait again ", Thread.CurrentThread.ManagedThreadId);
                    }
                }
            }
    SemaphoreSlim



    9.8 Event类
    事件是另一个系统范围内的资源同步方法。为了从托管代码中使用系统事件,.net framework提供了ManualResetEvent、AutoResetEvent、ManualResetEventSlim和CountdownEvent类。

           private static void Main(string[] args)
            {
                const int taskCount = 10;
                var mEvents = new ManualResetEventSlim[taskCount];
                var waitHandles = new WaitHandle[taskCount];
                var calcs = new Calculator[taskCount];
                TaskFactory taskFactory = new TaskFactory();
                for (int i = 0; i < taskCount; i++)
                {
                    mEvents[i] = new ManualResetEventSlim(false);
                    waitHandles[i] = mEvents[i].WaitHandle;
                    calcs[i] = new Calculator(mEvents[i]);
    
                    taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i + 1, i + 3));
    
                }
    
                for (int i = 0; i < taskCount; i++)
                {
                    int index = WaitHandle.WaitAny(waitHandles);
                    if (index == WaitHandle.WaitTimeout)
                    {
                        Console.WriteLine("Timeout!!");
    
                    }
                    else
                    {
                        mEvents[index].Reset();
                        Console.WriteLine("finished task for {0}, result: {1}", index, calcs[index].Result);
                        Thread.Sleep(100);
                    }
                }
    
                Console.ReadLine();
            }
    
            public class Calculator
            {
                private ManualResetEventSlim mEvent;
                public int Result { get; private set; }
    
                public Calculator(ManualResetEventSlim ev)
                {
                    this.mEvent = ev;
                }
    
                public void Calculation(Object obj)
                {
                    Tuple<int, int> data = (Tuple<int, int>)obj;
                    Console.WriteLine("Task {0} starts calculation", Task.CurrentId);
                    Thread.Sleep((3000));
                    Result = data.Item1 + data.Item2;
                    Console.WriteLine("Task {0} is ready", Task.CurrentId);
                    mEvent.Set();
                }
            }
    ManualResetEvent

    把任务分支到多个任务中,并在以后合并结果,使用新的CountdownEvent类很有用。
    不需要位每个任务创建一个单独的事件对象,而只需要创建一个事件对象。

    var mEvents = new ManualResetEventSlim[taskCount];
    // var cEvent = new CountdownEvent(taskCount);

    var waitHandles = new WaitHandle[taskCount];
    var calcs = new Calculator[taskCount];

    int index = WaitHandle.WaitAny(waitHandles);//wait

    //tasks
    mEvent.Set();//all thread set;
    //continue

            private static void Main(string[] args)
            {
                const int taskCount = 10;
                var cEvent = new CountdownEvent(taskCount);
    
                var calcs = new Calculator[taskCount];
                TaskFactory taskFactory = new TaskFactory();
                for (int i = 0; i < taskCount; i++)
                {
                    calcs[i] = new Calculator(cEvent);
    
                    taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i + 1, i + 3));
    
                }
    
                cEvent.Wait();
                Console.WriteLine("All finished.");
    
                Console.ReadLine();
            }
    
            public class Calculator
            {
                private CountdownEvent cEvent;
                public int Result { get; private set; }
    
                public Calculator(CountdownEvent ev)
                {
                    this.cEvent = ev;
                }
    
                public void Calculation(Object obj)
                {
                    Tuple<int, int> data = (Tuple<int, int>)obj;
                    Console.WriteLine("Task {0} starts calculation", Task.CurrentId);
                    Thread.Sleep((3000));
                    Result = data.Item1 + data.Item2;
                    Console.WriteLine("Task {0} is ready", Task.CurrentId);
                    cEvent.Signal();
                }
            }
    CountdownEvent

     9.9 Barrier 类
    适合于工作有多个任务分支且以后又需要合并工作的情况。

     var barrier = new Barrier(numberTasks + 1);
     barrier.SignalAndWait();//wait
    //tasks
    barrier.RemoveParticipant();//2 left

    barrier.RemoveParticipant();//1 left

    //continue.

    9.10 ReadWriterLockSlim类
    允许多个读取器。同时只有一个写入器工作,此时读取器不能工作。

            private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 };
            static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    
            static void ReadMethod(object reader)
            {
                try
                {
                    rwl.EnterReadLock();
                    for (int i = 0; i < items.Count; i++)
                    {
                        Console.WriteLine("read {0}, loop: {1}, item:{2}", reader, i, items[i]);
                        Thread.Sleep(40);
                    }
                }
                finally
                {
                    rwl.ExitReadLock();
                }
            }
    
            static void WriterMethod(object writer)
            {
                try
                {
                    while (!rwl.TryEnterWriteLock(50))
                    {
                        Console.WriteLine("Writer {0} waiting ,current reader count: {1}", writer, rwl.CurrentReadCount);
                    }
                    Console.WriteLine("Writer{0} acquired the lock.", writer);
                    for (int i = 0; i < items.Count; i++)
                    {
                        items[i]++;
                        Thread.Sleep(50);
    
                    }
                    Console.WriteLine("Writer {0} finished.", writer);
    
                }
                finally
                {
                    rwl.ExitWriteLock();
                }
            }
            private static void Main(string[] args)
            {
                var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
                var tasks = new Task[6];
                tasks[0] = taskFactory.StartNew(WriterMethod, 1);
                tasks[1] = taskFactory.StartNew(ReadMethod, 1);
                tasks[2] = taskFactory.StartNew(ReadMethod, 2);
                tasks[3] = taskFactory.StartNew(WriterMethod, 2);
                tasks[4] = taskFactory.StartNew(ReadMethod, 3);
                tasks[5] = taskFactory.StartNew(ReadMethod, 4);
                foreach (Task task in tasks)
                {
                    task.Wait();
    
                }
    
    
                Console.WriteLine("All finished.");
    
                Console.ReadLine();
            }
    ReadWriterLockSlim

    10 Timer类




  • 相关阅读:
    ISP基础(01):ISP模块列表
    Linux 开发(02):打印特殊含义转义符
    note template
    apply、call、bind的区别
    Fiddle 抓包工具
    post和get的使用场景和区别
    闭包
    原型链
    node.js
    CSS垂直居中
  • 原文地址:https://www.cnblogs.com/netact/p/3659737.html
Copyright © 2011-2022 走看看