Thread
没有参数的线程启动
Thread newThread = new Thread(new ThreadStart(DoWork)); newThread.Start();
有参数的线程启动
注意DoWork()的参数必须为object
Thread newThread2 = new Thread(new ParameterizedThreadStart(this.DoWork)); newThread2.Start("111");
AutoResetEvent
通知等待的其他线程,本线程已经工作做完.
比如要计算1-N的各个数的平方之和,每个数的平方由不同的线程去计算
/// <summary> /// 求数组平方和 /// </summary> public class GetSquareSum { //建立事件数组 ,N个线程,就N个AutoResetEvent public AutoResetEvent[] autoEvents = null; //数组 public int[] array = null; public int Sum = 0; public GetSquareSum(int []arrays) { this.array = arrays; autoEvents = new AutoResetEvent[arrays.Length]; for (int i = 0; i < arrays.Length; i++) { autoEvents[i] = new AutoResetEvent(false);//初始化 } this.GetResult(); WaitHandle.WaitAll(autoEvents); //autoEvents.ToList().ForEach(s => s.WaitOne()); foreach (int r in array) { Sum += r; } Console.WriteLine("Sum="+Sum); } public void GetResult() { for (int i = 0; i < array.Length; i++) { Calculate(i); } } /// <summary> /// 计算第index个数 /// </summary> /// <param name="index"></param> public void Calculate(int index) { Thread newThread2 = new Thread( new ParameterizedThreadStart( (obj) => { int j = (int)obj; array[j] = array[j] * array[j]; Console.WriteLine(array[j]); autoEvents[j].Set(); } ) ); newThread2.Start(index); } }
ManualResetEvent
MSDN上的解释是:通知一个或多个正在等待的线程已发生事件。
例子解释:
- 打印方法已经准备好,但是打印的东西没准备好,所以打印之前mreInit.WaitOne(),等待资源
-
通过控制台输入资源,mreInit.Set()通知资源准备好,打印程序开始打印
public class ManualResetEventTestExample2 { private static ManualResetEvent mreInit; private string _test = ""; public void Test() { mreInit = new ManualResetEvent(false);// Thread newThread = new Thread(new ThreadStart(() => Print())); newThread.Start(); _test = (Console.ReadLine()); mreInit.Set(); } /// <summary> /// 打印程序准备就绪 /// </summary> private void Print() { mreInit.WaitOne(); Console.WriteLine(_test); } }
ManualResetEvent的reset看下面代码
public class ManualResetEventTestExample2 { private static ManualResetEvent mreInit; private string _test = ""; public void Test() { mreInit = new ManualResetEvent(false);// new Thread(new ThreadStart(() => Print())).Start();//Print 1 _test = (Console.ReadLine());//Input 1 mreInit.Set(); mreInit.Reset(); new Thread(new ThreadStart(() => Print())).Start();//Print 2 new Thread(new ThreadStart(() => Print())).Start();//Print 3 new Thread(new ThreadStart(() => Print())).Start();//Print 4 new Thread(new ThreadStart(() => Print())).Start();//Print 5 _test = (Console.ReadLine());//Input 2 mreInit.Set(); } /// <summary> /// 打印程序准备就绪 /// </summary> private void Print() { mreInit.WaitOne(); Console.WriteLine(_test); }
print 1肯定会在输入后执行,Print 2,3,4,5的执行,则依赖reset.如果没有mreInit.Reset()这句,则Print1,2,3,4,5会直接依次执行.如果加了mreInit.Reset()这句,这Print2,3,4,5只有在input 2执行了之后才会执行.
Reset就是让ManualResetEvent回复到初始状态.
线程池 ThreadPool
public static bool QueueUserWorkItem (WaitCallback callBack); public static bool QueueUserWorkItem(WaitCallback callback, Object state);
这两个方法向线程池的队列添加一个工作项(work item)以及一个可选的状态数据。然后,这两个方法就会立即返回。
工作项其实就是由callback参数标识的一个方法,该方法将由线程池线程执行。 同时写的回调方法必须匹配System.Threading.WaitCallback委托类型,定义为:
public delegate void WaitCallback(Object state);
就是说如果要带参数,callback参数必须为object.
ThreadPool.QueueUserWorkItem(new WaitCallback(obj => { System.Threading.Thread.Sleep(1000); Console.WriteLine(obj.ToString()); }),"canshu");
或者这么写
ThreadPool.QueueUserWorkItem(new WaitCallback(Func), "canshu"); private void Func(object obj) { System.Threading.Thread.Sleep(1000); Console.WriteLine(obj.ToString()); }
或者
ThreadPool.QueueUserWorkItem(a=>{Func(a);}, "canshu");
以下代码改造求平方的问题.
class ThreadPoolTest2 { public void Test() { // List<int> result = new List<int>(); int sum = 0; object lockobj = new object(); Action<object> action = obj => { Parm p = (Parm)obj; lock (lockobj) { sum += p.Num * p.Num; } p.Are.Set(); }; int[] array = new int[] { 1, 2, 3, 4, 5,6,7,8,9,10,11,12,13,14,15 }; AutoResetEvent[] autos = new AutoResetEvent[array.Length]; for (int i = 0; i < array.Length; i++) { autos[i] = new AutoResetEvent(false); ThreadPool.QueueUserWorkItem(a=>action(a), new Parm() { Are = autos[i], Num = array[i] }); } Console.WriteLine(sum); //WaitHandle.WaitAll(autos); autos.ToList().ForEach(s=>s.WaitOne()); Console.WriteLine(sum); } class Parm { public int Num { set; get; } public AutoResetEvent Are { set; get; } } }
注意:在sum += p.Num * p.Num这句必须lock,不然会得不到正确结果
CancellationTokenSource
例子讲解:有5个数组,需要在数组中找是否有0元素,找到则返回.
实现:每个数据开启一个现场,如果某一个线程先找到0,则返回,同时,通知其他线程不用计算了.
class CancellationTokenSourceTest2 { public void Test() { CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken token = cts.Token; int[][] array = new int[][] { new []{1,2,3,0,5,6,7,8,9,10}, new []{1,2,3,4,5,6,7,8,9,0}, new []{1,2,3,4,5,6,7,8,0,10}, new []{1,2,3,4,5,6,0,8,9,10}, new []{1,2,3,4,5,0,7,8,9,10} }; for (int i = 0; i < array.Length; i++) { Thread th = new Thread(new ParameterizedThreadStart ((obj) => { Parm p = (Parm)obj; int[] a = p.Value; for (int n=0;n<a.Length;n++) { int j = a[n]; Console.WriteLine("in array " + p.Index +" "+ (n + 1) + " times"); if (!p.Cts.IsCancellationRequested) { if (j == 0) { p.Cts.Cancel(); Console.WriteLine("Cancelling at task " + p.Index + "th array"); break; } Thread.Sleep(1000); } else { break; } } } )); th.Start(new Parm() { Cts = cts, Value = array[i] , Index=i}); } Console.ReadLine();//敲下回车后,, } public class Parm { public CancellationTokenSource Cts; public int []Value; public int Index; } }
运行结果,首先在第0个数组中找到,总共比较次数为20次左右.
也可以尝试用AutoResetEvent实现,用 WaitHandle.WaitAny()等待.
BeginInvoke 和 EndInvoke
例子1:说明异步执行,将一个数乘以2返回,异步后等待结果后求和.
class BeginInvokeTest { public void Test() { Func<int, int> GetDouble = (a) => { Console.WriteLine(a); return a * 2; }; IAsyncResult ar1 = GetDouble.BeginInvoke(100, null, null); IAsyncResult ar2 = GetDouble.BeginInvoke(200, null, null); int result = GetDouble.EndInvoke(ar1)+GetDouble.EndInvoke(ar2); Console.WriteLine("result = " + result); } }
打印结果为
100
200
result = 600
或者
200
100
result = 600
100和200的打印是异步执行,不知道誰先执行完,但是求和得等到EndInvoke两个异步完成,才能计算
例子2:两个异步去发送消息,发送完成后,去做其他事情,并且在发送消息的回调中保存消息记录↓.
class BeginInvokeTest2//回调方法中处理 { public void Test() { Func<string, string> SendMessage = (a) => { System.Threading.Thread.Sleep(1000); Console.WriteLine("发送消息:"+a+" ,"); return "OK"; }; AsyncCallback callback = (ar) => { Parm p = (Parm)ar.AsyncState; if (p.F.EndInvoke(ar) == "OK") { Console.WriteLine("消息发送成功,保存聊天记录 "+p.Message); } }; Parm p1=new Parm(){ F=SendMessage, Message= "message1"}; Parm p2 = new Parm() { F = SendMessage, Message = "message2" }; IAsyncResult iar1 = SendMessage.BeginInvoke(p1.Message, callback, p1); IAsyncResult iar2 = SendMessage.BeginInvoke(p2.Message, callback, p2); iar1.AsyncWaitHandle.WaitOne();//等待执行完毕,并不是等待callback执行完 iar2.AsyncWaitHandle.WaitOne();//等待执行完毕,并不是等待callback执行完 Console.WriteLine("消息发完,做其他事情"); Console.Read(); } class Parm { public Func<string, string> F; public string Message; } }
线程同步Interlocked.Increment
以原子操作的形式递增指定变量的值并存储结果
例子说明:用3种方法对初始值为1的进行一百次++操作.第一种直接加,第二,三种方法多线程.第三种用Interlocked.Increment.运行结果为第二种的结果值可能不为101.
class InterlockedTest { public void Test() { int N = 1; for (int i = 1; i <= 100; i++) { N++; } Console.WriteLine(N); int M = 1; Action<object> action = obj => { Parm p = (Parm)obj; M++; p.au.Set(); }; AutoResetEvent[] autos = new AutoResetEvent[100]; for (int i = 1; i <= 100; i++) { autos[i-1] = new AutoResetEvent(false); ThreadPool.QueueUserWorkItem(a=>action(a), new Parm() { au = autos[i-1], index = i }); } autos.ToList().ForEach(s => s.WaitOne()); Console.WriteLine(M); int Q = 1; Action<object> action2 = obj => { Parm p = (Parm)obj; Interlocked.Increment(ref Q); p.au.Set(); }; autos = new AutoResetEvent[100]; for (int i = 1; i <= 100; i++) { autos[i - 1] = new AutoResetEvent(false); ThreadPool.QueueUserWorkItem(a => action2(a), new Parm() { au = autos[i - 1], index = i }); } autos.ToList().ForEach(s => s.WaitOne()); Console.WriteLine(Q); } public class Parm { public AutoResetEvent au; public int index; } }
线程同步 Monitor.Enter和Lock
Lock关键字是一个语法糖,它将Monitor对象进行封装.
语法糖(Syntactic sugar),是由Peter J. Landin(和图灵一样的天才人物,是他最先发现了Lambda演算,由此而创立了函数式编程)创造的一个词语,它意指那些没有给计算机语言添加新功能,而只是对人类来说更“甜蜜”的语法。语法糖往往给程序员提供了更实用的编码方式,有益于更好的编码风格,更易读。不过其并没有给语言添加什么新东西。
例子同Interlocked.Increment,不再详细说明
class MonitorTest { public void Test() { int N = 1; for (int i = 1; i <= 100; i++) { N++; } Console.WriteLine(N); int M = 1; Action<object> action = obj => { Parm p = (Parm)obj; M++; p.au.Set(); }; AutoResetEvent[] autos = new AutoResetEvent[100]; for (int i = 1; i <= 100; i++) { autos[i - 1] = new AutoResetEvent(false); ThreadPool.QueueUserWorkItem(a => action(a), new Parm() { au = autos[i - 1], index = i }); } autos.ToList().ForEach(s => s.WaitOne()); Console.WriteLine(M); int Q = 1; object objMonitor=new object(); Action<object> action2 = obj => { Parm p = (Parm)obj; Monitor.Enter(objMonitor); try { Q++; } finally { Monitor.Exit(objMonitor); } p.au.Set(); }; autos = new AutoResetEvent[100]; for (int i = 1; i <= 100; i++) { autos[i - 1] = new AutoResetEvent(false); ThreadPool.QueueUserWorkItem(a => action2(a), new Parm() { au = autos[i - 1], index = i }); } autos.ToList().ForEach(s => s.WaitOne()); Console.WriteLine(Q); } public class Parm { public AutoResetEvent au; public int index; } }
ReaderWriterLock
例子说明:ReaderWriterLockEntity有2个成员变量X,Y,读写方法各有2个,一个使用读写锁,一个直接读写.我们设定X和Y必须相等.
class ReaderWriterLockEntity { ReaderWriterLock locker = new ReaderWriterLock(); public int X { set; get; } public int Y { set; get; } public void ReadLock(ref int x, ref int y) { locker.AcquireReaderLock(Timeout.Infinite); try { x = this.X; y = this.Y; } finally { locker.ReleaseReaderLock(); } } public void WriteLock(int x, int y) { locker.AcquireWriterLock(Timeout.Infinite); try { this.X = x; Thread.Sleep(10); this.Y = y; } finally { locker.ReleaseWriterLock(); } } public void Read(ref int x, ref int y) { x = this.X; y = this.Y; } public void Write(int x, int y) { this.X = x; Thread.Sleep(10); this.Y = y; } }
读写类,其中Test方法中,开启两组现场写线程,对ReaderWriterLockEntity中的X,Y进行自增.两组读线程,不停的打印X,Y.
打印的结果中,有X和Y不相等的结果出现.
Test2方法中,加了读写锁,打印的结果,不会有X和Y不等的情况.
ReaderWriterLock和Lock的区别就是,当加了ReaderLock,应该不会影响多个ReaderLock的增加.一个资源可以被Reader Lock多次.而lock则达不到这个要求.所以,ReadWriterLock在某些场景,比Lock效率更高.
class ReaderWriterTest2 { ReaderWriterLockEntity entity = new ReaderWriterLockEntity(); public void Test() { Action writer = () => { int a = 10; int b = 10; //Console.WriteLine("************** Write *************"); for (int i = 0; i < 5; i++) { this.entity.Write(a++, b++); Thread.Sleep(10); } }; Action reader = () => { // Console.WriteLine("************** Reader *************"); int x=0, y=0; for (int i = 0; i < 50; i++) { this.entity.Read(ref x, ref y); Console.WriteLine("Read:X={0},y={1}", x, y); Thread.Sleep(1); } }; //Writer Threads Thread wt1 = new Thread(new ThreadStart(writer)); wt1.Start(); Thread wt2 = new Thread(new ThreadStart(writer)); wt2.Start(); //Reader Threads Thread rt1 = new Thread(new ThreadStart(reader)); rt1.Start(); Thread rt2 = new Thread(new ThreadStart(reader)); rt2.Start(); } public void Test2() { Action writer = () => { int a = 10; int b = 10; for (int i = 0; i < 5; i++) { this.entity.WriteLock(a++, b++); Thread.Sleep(10); } }; Action reader = () => { int x = 0, y = 0; for (int i = 0; i < 50; i++) { this.entity.ReadLock(ref x, ref y); Console.WriteLine("Read:X={0},y={1}", x, y); Thread.Sleep(1); } }; //Writer Threads Thread wt1 = new Thread(new ThreadStart(writer)); wt1.Start(); Thread wt2 = new Thread(new ThreadStart(writer)); wt2.Start(); //Reader Threads Thread rt1 = new Thread(new ThreadStart(reader)); rt1.Start(); Thread rt2 = new Thread(new ThreadStart(reader)); rt2.Start(); } }
Semaphore信号量
例子说明:厕所有5个位置,每个人上厕所五秒.程序输入1后,表示有一个人要上厕所.如果厕所已满,则会自动等待.
class SemaphoreTest { Semaphore sem = new Semaphore(5,5);//厕所空的 public void In() { sem.WaitOne(); Console.WriteLine("有空位,上厕所"); Thread.Sleep(5000);//上厕所需要五秒 sem.Release();//上完了 Console.WriteLine("出厕所"); } public void Out() { Thread.Sleep(5000);//上厕所需要五秒 sem.Release(); Console.WriteLine("出厕所"); } public void Test() { while(true) { string input = Console.ReadLine(); if (input == "1")//入厕 { new Thread(new ThreadStart(()=>{ In();})).Start(); ; } } } }
不同程序,不同的exe,只要信号量的Name相同,信号量是共享的。
例子说明:修改上厕所的程序.一个程序只负责入厕,什么时候出厕所,则由管理人员来控制(另一个程序). SemaphoreTest2运行后,输入5次1,表示有5个人入厕了.然后在输入几个1,表示还有人在等待.
class SemaphoreTest2 { Semaphore sem = new Semaphore(0, 5, "AAA");//厕所空的 public void In() { sem.WaitOne(); Console.WriteLine("有空位,上厕所"); } public void Test() { while (true) { string input = Console.ReadLine(); if (input == "1")//入厕 { new Thread(new ThreadStart(() => { In(); })).Start(); ; } } } }
在启动另外的项目,代码如下:启动后,输入2(喊一个人出厕所),发现入厕程序会有人进入厕所
class SemaphoreTest3 { Semaphore sem = new Semaphore(0,5,"AAA");//厕所空的 public void Out() { Thread.Sleep(1000);//上厕所需要五秒 sem.Release(); Console.WriteLine("出厕所"); } public void Test() { while(true) { string input = Console.ReadLine(); if (input == "2")//出厕 { new Thread(new ThreadStart(()=>{ Out();})).Start(); ; } } } }
注意:这是2个不同的程序,他们的Semaphore的Name是相同的.
有一点不明白的地方是在程序1中运行 Semaphore sem = new Semaphore(0, 5, "AAA");//表示厕所满了. 在运行第二个程序 Semaphore sem = new Semaphore(5, 5, "AAA");这时候,程序1中仍然是满的.在new的时候,没有互相干扰.
注意下面这个构造函数,可以检测信号量Name是否重复.
public Semaphore( int initialCount, int maximumCount, string name, out bool createdNew )
意外发现: Semaphore sem = new Semaphore(0, 5,"厕所"); 如果Name为中文,new Semaphore(0, 5,"厕所")和new Semaphore(5, 5,"厕所")是一样的.一开始就会有5个可用信号量.
Mutex
只理解了这句,Mutex本身是可以系统级别的,所以是可以跨越进程的。比如我们要实现一个软件不能同时打开两次,那么Mutex是可以实现的,而lock和monitor是无法实现的。其他和lock的区别,其实没太懂.