异步编程从很久之前就开始学习,但是发现只要是不总结的东西,总是特别容易忘记,而且没有深入的学习和实践,对技术的细节和原理也是理解不深,所以这次一定要把最新学到的,和原来记忆不清的知识从新梳理一遍。
1 基本概念
线程,进程和应用程序域的基本知识在这有总结:博客园
同步异步和阻塞非阻塞,有简单理解也有详细解释,同时对应到linux的五种网络模型,这里简单解释一下:
1.1 同步和异步,阻塞与非阻塞
“阻塞”与"非阻塞"与"同步"与“异步"不能简单的从字面理解,提供一个从分布式系统角度的回答。
1.同步与异步:同步和异步关注的是消息通信机制(synchronous
communication/ asynchronous communication)所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。
换句话说,就是由调用者主动等待这个调用的结果。而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
典型的异步编程模型比如Node.js举个通俗的例子:你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。
2.阻塞与非阻塞阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
1.2 主线程、工作者线程、前台线程、后台线程、线程池
- 主线程:每一个Windows进程都恰好包含一个用作程序入口点的主线程。进程的入口点创建的第一个线程被称为主线程。.Net执行程序(控制台、Windows Form、Wpf等)使用Main()方法作为程序入口点。当调用该方法时,主线程被创建。
- 工作者线程:由主线程创建的线程,可以称为工作者线程,用来去执行某项具体的任务。
- 前台线程:默认情况下,使用Thread.Start()方法创建的线程都是前台线程。前台线程能阻止应用程序的终结,只有所有的前台线程执行完毕,CLR才能关闭应用程序(即卸载承载的应用程序域)。前台线程也属于工作者线程。
- 后台线程:后台线程不会影响应用程序的终结,当所有前台线程执行完毕后,后台线程无论是否执行完毕,都会被终结。一般后台线程用来做些无关紧要的任务(比如邮箱每隔一段时间就去检查下邮件,天气应用每隔一段时间去更新天气)。后台线程也属于工作者线程。
- 线程池:线程池是为突然大量爆发的线程设计的,通过有限的几个固定线程为大量的操作服务,减少了创建和销毁线程所需的时间,从而提高效率,这也是线程池的主要好处。ThreadPool适用于并发运行若干个任务且运行时间不长且互不干扰的场景。
还有一点需要注意,通过线程池创建的任务是后台任务。
2 核心编程
2.1 回顾
在我的理解中,.net的异步程序比较旧的有手动创建Thread,异步的委托调用。到了.net framwork 4.5以后,有了async和await,相对于原始的异步编程,变得更加简洁和易读,但是要完全搞懂还是需要深入学习一下。
2.2 ThreadPool 编程实践。
static void Main(string[] args)
{
Console.WriteLine("fun with the clr thread pool");
Console.WriteLine("main thread started,threadid={0}", Thread.CurrentThread.ManagedThreadId);
printer p = new printer();
WaitCallback workItem = new WaitCallback(printNumbers);
for (int i = 0; i < 10; i++)
{
ThreadPool.QueueUserWorkItem(workItem, p);
}
Console.ReadLine();
}
static void printNumbers(object state)
{
printer task = (printer)state;
task.PrintNumbers();
}
WaitCallBack 委托指向有单个Object类型的参数且无返回值的方法。
ThreadPool.QueueUserWorkItem()方法使用线程池中的工作者线程排队执行一个方法。
使用线程池的好处主要是:
- 减少了线程创建、开始和停止的次数,提高了效率。
在某些情况下我们还是有限使用手动线程管理:
- 如果需要前台线程或者设置优先级别。线程池中的线程总是后台线程,且它的优先级是默认的ThreadPriority.Normal
- 如果需要有一个带固定标识的线程便于退出、挂起或者通过名字发现它。
(以上内容大多是抄书,还没有在项目中有深刻的体会,以后再补充)
2.2 Task
Task 的内容和后面的async、await关系比较密切,就多查了一下资料,内容很多,只能慢慢写。
大纲:
- Task和TaskFactory
- start,run,wait,waitall,wenall
- 错误捕捉
- 强制停止
2.2.1 Task初始化
创建Task有三种方式:
Task t1=new Task(myMethod);
Task t2= Task.Factory.StartNew(MyMethod);
Task t3=Task.Run(myMethod);
MSDN说处于性能考虑,推荐使用后两种方式。而且后两种方式不需要手动start()
2.2.2 start、run、wait、waitall、wenall
有几个比较重要的方法,摘抄一下msdn的定义。
- Wait():等待 Task 完成执行过程。
- WaitAll(Task[]):等待提供的所有 Task 对象完成执行过程。
- WhenAll(Task[]):创建一个任务,该任务将在数组中的所有 Task 对象都完成时完成。
*ContinueWith(Action):创建一个在目标 Task 完成时异步执行的延续任务。
2.2.3 异常处理
在异步编程中,主线程是无法捕获工作者线程抛出的exception,目前处理异常有两种思路,一种是用continueWith()在下一个Task处理异常,另一种是包装到主线程,由主线程处理。
下面是使用contiueWith()
static void Main(string[] args)
{
Task t1 = new Task(() =>
{
throw new Exception($"线程{Thread.CurrentThread.ManagedThreadId}发生错误");
});
t1.Start();
Task t2 = t1.ContinueWith((task) =>
{
foreach (Exception ex in task.Exception.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
});
Console.ReadKey();
}
使用await包装到主线程处理:
static void Main(string[] args)
{
Task t = new Task(() =>
{
throw new InvalidOperationException("任务并行编码中产生的未知异常");
});
t.Start();
Task ttEnd = t.ContinueWith((task) =>
{
throw task.Exception;
}, TaskContinuationOptions.OnlyOnFaulted);
try
{
tEnd.Wait();
}
catch (AggregateException err)
{
foreach (var item in err.InnerExceptions)
{
Console.WriteLine("异常类型:{0}{1}来自:
{2}{3}异常内容:{4}", item.InnerException.GetType(),
Environment.NewLine, item.InnerException.Source,
Environment.NewLine, item.InnerException.Message);
}
}
Console.WriteLine("主线程马上结束");
Console.ReadKey();
}
使用事件通知包装到主线程:
static event EventHandler<AggregateExceptionArgs> AggregateExceptionCatched;
public class AggregateExceptionArgs: EventArgs
{
public AggregateException AggregateException{ get; set; }
}
static void Main(string[] args)
{
AggregateExceptionCatched += EventHandler<AggregateExceptionArgs>(Program_AggregateExceptionCatched);
Task t = new Task(() =>
{
try
{
throw new InvalidOperationException("任务并行编码中产生的未知异常");
}
catch (Exception err)
{
AggregateExceptionArgs errArgs = new AggregateExceptionArgs()
{ AggregateException = new AggregateException(err) };
AggregateExceptionCatched(null, errArgs);
}
});
t.Start();
Console.WriteLine("主线程马上结束");
Console.ReadKey();
}
static void Program_AggregateExceptionCatched(object sender, AggregateExceptionArgs e)
{
foreach (var item in e.AggregateException.InnerExceptions)
{
Console.WriteLine("异常类型:{0}{1}来自:{2}{3}异常内容:{4}",
item.GetType(), Environment.NewLine, item.Source,
Environment.NewLine, item.Message);
}
}
3.1 asyncawait
大纲:
- 概念和简介
- 使用示例
- 项目实战
- 原理解析
3.1.1 概念和简介
3.1.2 使用示例
微软MSDN上的执行顺序图。
class Program
{
private static void Main(string[] args)
{
Console.WriteLine("主线程启动,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
var task = GetLengthAsync();
Console.WriteLine("回到主线程,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("线程[" + Thread.CurrentThread.ManagedThreadId + "]睡眠5s:");
Thread.Sleep(5000); //将主线程睡眠5s
var timer = new Stopwatch();
timer.Start(); //开始计算时间
Console.WriteLine("task的返回值是" + task.Result);
timer.Stop(); //结束点,另外stopwatch还有Reset方法,可以重置。
Console.WriteLine("等待了:" + timer.Elapsed.TotalSeconds + "秒"); //显示时间
Console.WriteLine("主线程结束,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
Console.ReadKey();
}
private static async Task<int> GetLengthAsync()
{
Console.WriteLine($"GetLengthAsync()开始执行,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
var str = await GetStringAsync();
Console.WriteLine($"GetLengthAsync()执行完毕,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
return str.Length;
}
private static Task<string> GetStringAsync()
{
Console.WriteLine("GetStringAsync()开始执行,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
return Task.Run(() =>
{
Console.WriteLine("异步任务开始执行,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("线程[" + Thread.CurrentThread.ManagedThreadId + "]睡眠10s:");
Thread.Sleep(10000); //将异步任务线程睡眠10s
Console.WriteLine("GetStringAsync()执行完毕,当前线程为:" + Thread.CurrentThread.ManagedThreadId);
return "GetStringAsync()执行完毕";
});
}
}
测试代码执行结果为:
- 主线程启动,当前线程为:1
- GetLengthAsync()开始执行,当前线程为:1
- GetStringAsync()开始执行,当前线程为:1
- 回到主线程,当前线程为:1
- 异步任务开始执行,当前线程为:3
- 线程[3]睡眠10s:
- 线程[1]睡眠5s:
- GetStringAsync()执行完毕,当前线程为:3
- GetLengthAsync()执行完毕,当前线程为:3
- task的返回值是20
- 等待了:4.9940226秒
- 主线程结束,当前线程为:1
下面是个人的理解,如果用await 标记等待异步方法,那么这里就是异步阻塞的,如果用Task对象去接受异步方法返回的Task,就是异步非阻塞的,而且真正的异步代码,基本上是在Task.Run()内部才开始真正在其他线程上运行。
3.1.3 项目实战
项目中我要实现的是一个多线程爬虫,尤其是在每个站点的列表页爬取完成后,需要爬取20-30个详情页,目前只做了详情页的多线程。
3.1.4 原理
执行的原理其实是Task+状态机。这里就不详细研究了,贴上一下学习时用到的链接。