Thread

//开启线程, 构造函数参数可以是委托,可以是方法名, 可以是ThreadStart(不带参数),ParameterizedThreadStart(可以在start内传入object类型参数) 类型委托 //Thread th = new Thread(()=> { }); Thread thWithoutParam = new Thread(new ThreadStart(() => { })); Thread thWithParam = new Thread(new ParameterizedThreadStart((t) => { Thread.Sleep(10000); })); thWithoutParam.IsBackground = true; //后台线程在程序关闭时,可以不用等待后台线程执行完毕,前台线程需要等待线程执行完毕才能关闭 thWithoutParam.Start(); thWithParam.Start(new object()); //可以让线程进入等待, 参数可以是TimeSpan 或者是 millisecondsTimeout 的整毫秒数 thWithParam.Join(); //释放线程 thWithParam.Abort(); /* * 注意: * 如果使用 Application.Exit() 退出程序时 会等待前台线程执行完才关闭程序 * Environment.Exit(0) 可以直接关闭,而不等待前台线程 */
ThreadPool

/* * 线程池 对于 Thread 的优点: 在频繁创建线程的时候,效率高. 把线程的开启与销毁交给系统,它会尽可能的用最优去管理 */ for (int i = 0; i < 10; i++) { //ThreadPool.QueueUserWorkItem(new WaitCallback(x => { }));//可以简写为 ThreadPool.QueueUserWorkItem(x => { ConsoleUtils.PrintLine(x + "========线程ID:" + Thread.CurrentThread.ManagedThreadId + " 开始执行"); Thread.Sleep(5000); ConsoleUtils.PrintLine(x + "========线程ID:" + Thread.CurrentThread.ManagedThreadId + " 执行完成"); }, i);//可以传入一个object 参数, 委托参数接收 }
Task

/* * Thread: * 当我们提及多线程的时候会想到thread和threadpool,这都是异步操作, * threadpool其实就是thread的集合,具有很多优势,不过在任务多的时候全局队列会存在竞争而消耗资源。 * thread默认为前台线程,主程序必须等线程跑完才会关闭,而threadpool相反。 * * Task: * task简单地看就是任务,那和thread有什么区别呢? * Task的背后的实现也是使用了线程池线程,但它的性能优于ThreadPoll, * 因为它使用的不是线程池的全局队列,而是使用的本地队列, * 使线程之间的资源竞争减少。同时Task提供了丰富的API来管理线程、控制。 * 但是相对前面的两种耗内存,Task依赖于CPU对于多核的CPU性能远超前两者, * 单核的CPU三者的性能没什么差别 */ ConsoleUtils.PrintLine("main thread start.." + Thread.CurrentThread.ManagedThreadId); //开启一个线程 执行 返回类型是 Task<T> var task = Task.Run(() => { ConsoleUtils.PrintLine("task execuete:" + Thread.CurrentThread.ManagedThreadId, ConsoleType.Warn); Thread.Sleep(5000); ConsoleUtils.PrintLine("task execuete after 5 secondes:" + Thread.CurrentThread.ManagedThreadId, ConsoleType.Warn); }); //使用这个task对象, 在上面线程执行完成后, 开启线程执行 continueWith 中的代码 task.ContinueWith(x => ConsoleUtils.PrintLine("execute in continue with:" + Thread.CurrentThread.ManagedThreadId)); ConsoleUtils.PrintLine("main thread end.." + Thread.CurrentThread.ManagedThreadId); //---------------------------------------任务等待---------------------------------------------------------- //开启一个任务 定义一个任务对象, Task testTask = new Task(() => { Console.WriteLine("task start"); System.Threading.Thread.Sleep(2000); }); //任务开始 testTask.Start(); var factoryTeak = Task.Factory.StartNew(() => { Console.WriteLine("factory task start"); }); //等待多个任务完成, 多线程中的异常可以在此处被捕获 Task.WaitAll(testTask, factoryTeak); //等待任意一个任务完成,就执行下面代码 Task.WaitAny(testTask, factoryTeak); Console.WriteLine("end"); var singleTask = new Task(() => { Console.WriteLine("task start"); System.Threading.Thread.Sleep(2000); }); singleTask.Start(); //单个任务进行等待 singleTask.Wait(); Console.WriteLine("end"); //------------------任务取消------------------------- var tokenSource = new CancellationTokenSource();//创建取消task实例 var cancelTask = new Task(() => { for (int i = 0; i < 6; i++) { Console.WriteLine("hello:" + i); System.Threading.Thread.Sleep(1000); } }, tokenSource.Token); //Console.WriteLine(testTask.Status); //如果任务已经开始执行,将不能被取消 //testTask.Start(); Console.WriteLine(cancelTask.Status); //取消时执行的回调函数 tokenSource.Token.Register(() => { Console.WriteLine("task is to cancel"); }); tokenSource.Cancel(); Console.WriteLine(cancelTask.Status); //---------------父子task------------ var parentTask = new Task(() => { var childTask = new Task(() => { System.Threading.Thread.Sleep(2000); Console.WriteLine("childTask to start"); }, TaskCreationOptions.AttachedToParent); childTask.Start(); Console.WriteLine("parentTask to start"); }); parentTask.Start(); //会等待父子task都执行完毕,才执行后续代码 parentTask.Wait(); Console.WriteLine("end");
Parallel

//数组的并行执行 List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; //var evenNumbers = nums.AsEnumerable().Select(item => Calculate(item)); //AsOrdered 将数据以原来顺序排序返回 var evenNumbers = nums.AsParallel().AsOrdered().Select(item => { Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId); return item * 2; }); //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果; //Console.WriteLine(evenNumbers.Count()); foreach (int item in evenNumbers) Console.WriteLine(item);

/* * ****************Parallel.Invoke 主要用于任务的并行******* * 这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能。它有两种形式: * Parallel.Invoke( params Action[] actions); * Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options); */ var actions = new Action[] { () => ActionTest("test 1"), () => ActionTest("test 2"), () => ActionTest("test 3"), () => ActionTest("test 4") }; Console.WriteLine("Parallel.Invoke 1 Test"); Parallel.Invoke(actions); Console.WriteLine("结束!")

//Parallel.For方法,主要用于处理针对数组元素的并行操作(数据的并行) (无序的) int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; Parallel.For(0, nums.Length, i => { Thread.Sleep(1000); Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId); });

//Foreach方法,主要用于处理泛型集合元素的并行操作(数据的并行) 同上 List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; Parallel.ForEach(nums, (item) => { Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId); });

using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; namespace ConsoleApp1 { public class Student { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime CreateTime { get; set; } } class Program { static void Main(string[] args) { var dic = LoadData(); Stopwatch watch = new Stopwatch(); watch.Start(); var query2 = (from n in dic.Values.AsParallel() where n.Age > 20 && n.Age < 25 select n).ToList(); watch.Stop(); Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds); Console.Read(); } public static ConcurrentDictionary<int, Student> LoadData() { ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>(); ParallelOptions options = new ParallelOptions(); //指定使用的硬件线程数为4 options.MaxDegreeOfParallelism = 4; //预加载1500w条记录 Parallel.For(0, 15000000, options, (i) => { var single = new Student() { ID = i, Name = "hxc" + i, Age = i % 151, CreateTime = DateTime.Now.AddSeconds(i) }; dic.TryAdd(i, single); }); return dic; } } }

//是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。 //Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。 //Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。 using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 20000000, (i, state) => { if (bag.Count == 1000) { //state.Break(); state.Stop(); return; } bag.Add(i); }); Console.WriteLine("当前集合有{0}个元素。", bag.Count); } } }

using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { public static void Main() { var cts = new CancellationTokenSource(); var ct = cts.Token; Task.Factory.StartNew(() => fun(ct)); Console.ReadKey(); //Thread.Sleep(3000); cts.Cancel(); Console.WriteLine("任务取消了!"); } static void fun(CancellationToken token) { Parallel.For(0, 100000, new ParallelOptions { CancellationToken = token }, (i) => { Console.WriteLine("针对数组索引{0}的一些工作代码……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId); }); } } }

using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { try { Parallel.Invoke(Run1, Run2); } catch (AggregateException ex) { foreach (var single in ex.InnerExceptions) { Console.WriteLine(single.Message); } } Console.WriteLine("结束了!"); //Console.Read(); } static void Run1() { Thread.Sleep(3000); throw new Exception("我是任务1抛出的异常"); } static void Run2() { Thread.Sleep(5000); throw new Exception("我是任务2抛出的异常"); } } }
注意Parallel里面 不建议抛出异常 因为在极端的情况下比如进去的第一批线程先都抛异常了 此时AggregateExcepation就只能捕获到这一批的错误,然后程序就结束了

using System; using System.Collections.Generic; using System.Threading.Tasks; namespace ConsoleApp1 { public class TestClass { public static List<int> NumberList = null; private static readonly object locker = new object(); public void Test(int Number) { throw new Exception("1111"); //lock (locker) //{ // if (NumberList == null) // { // Console.WriteLine("执行添加"); // NumberList = new List<int>(); // NumberList.Add(1); // //Thread.Sleep(1000); // } //} //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number)); //Console.WriteLine(Number); } } class Program { private static readonly object locker = new object(); static void Main(string[] args) { List<string> errList = new List<string>(); try { Parallel.For(0, 10, (i) => { try { TestClass a = new TestClass(); a.Test(i); } catch (Exception ex) { lock (locker) { errList.Add(ex.Message); throw ex; } } }); } catch (AggregateException ex) { foreach (var single in ex.InnerExceptions) { Console.WriteLine(single.Message); } } int Index = 1; foreach (string err in errList) { Console.WriteLine("{0}、的错误:{1}", Index++, err); } } } }
可以像下面这样来处理一下,不在AggregateExcepation中来处理 而是在Parallel里面的try catch来记录错误,或处理错误

using System; using System.Collections.Generic; using System.Threading.Tasks; namespace ConsoleApp1 { public class TestClass { public static List<int> NumberList = null; private static readonly object locker = new object(); public void Test(int Number) { throw new Exception("1111"); //lock (locker) //{ // if (NumberList == null) // { // Console.WriteLine("执行添加"); // NumberList = new List<int>(); // NumberList.Add(1); // //Thread.Sleep(1000); // } //} //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number)); //Console.WriteLine(Number); } } class Program { private static readonly object locker = new object(); static void Main(string[] args) { List<string> errList = new List<string>(); Parallel.For(0, 10, (i) => { try { TestClass a = new TestClass(); a.Test(i); } catch (Exception ex) { lock (locker) { errList.Add(ex.Message); } //Console.WriteLine(ex.Message); //注:这里不再将错误抛出..... //throw ex; } }); int Index = 1; foreach (string err in errList) { Console.WriteLine("{0}、的错误:{1}", Index++, err); } } } }
async await

private void button2_Click(object sender, EventArgs e) { /* * async await是一个优雅的异步写法 * 使用await 修饰的异步函数 返回的是 Task<T> 中的 T, 否则返回的是 Task<T> * async await 是成对存在的, 如果被async修饰, 但没有await 也将是一个同步函数 */ ConsoleUtils.PrintLine("111 balabala. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId); //1. 如果异步函数带有返回值, 在主函数中有使用到返回值, 则会在主函数中异步函数执行完成 var ResultTask = AsyncMethod(); ConsoleUtils.PrintLine(ResultTask.Result); ConsoleUtils.PrintLine("222 balabala. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId); } private async Task<string> AsyncMethod() { var ResultFromTimeConsumingMethod = TimeConsumingMethod(); string Result = await ResultFromTimeConsumingMethod + " + AsyncMethod. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId; //2. 在await 之后的代码都是由主线程执行, 如果要返回的话,且主线程(如步骤1)主线程在等待 将会出现 [死锁] //3. 优点: 在wpf,winform等窗体中,使用这个将可以不使用 Invoke + 委托 的行,将页面数据更新交由主线程执行(这就是优雅的地方) **** ConsoleUtils.PrintLine(Result); return Result; } //这个函数就是一个耗时函数,可能是IO操作,也可能是cpu密集型工作。 private Task<string> TimeConsumingMethod() { var task = Task.Run(() => { ConsoleUtils.PrintLine("Helo I am TimeConsumingMethod. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId, ConsoleType.Warn); Thread.Sleep(5000); ConsoleUtils.PrintLine("Helo I am TimeConsumingMethod after Sleep(5000). My Thread ID is :" + Thread.CurrentThread.ManagedThreadId, ConsoleType.Warn); return "Hello I am TimeConsumingMethod"; }); return task; }