zoukankan      html  css  js  c++  java
  • 初探.net framework 下的异步多线程

    初探.net framework 下的异步多线程

    目录

    1、多线程的出现条件

    2、Thread和ThreadPool的相关Api及用法

    3、Task和Parallel的相关Api及用法

    4、Async&&Await

    多线程的出现条件

    • 用户在执行一个操作的时候,可以同时的执行一些其他操作。(例如在写入一个文件的时候,可以同时推送一条信息;还有一种情况,就是例如在编写Winform代码时候,提交一个比较费时的操作,这时候会造成UI界面假死,此时就可以把这个费事的操作交给一个子线程来完成,亦或者方法的异步调用。)
    • 我们的CPU是高速的,分时间片执行的,操作系统将其封装为一个个的线程,多个线程运行于一个进程之中,在我们的.net framework框架,将操作系统级别的线程 做了再次封装,就是我们所了解到的Thread类。
    • 这里我们再谈下多线程异步的区别。在我们编写C#代码的时候,异步方法是这么调用的。
      在启用BeginInvoke方法时候,action异步调用。此时可以看到,回掉函数由一个Id为3的线程来执行的。在.net framework中,异步就是由主线程开启一个子线程来完成回调任务。
                Console.WriteLine("***************委托的异步调用***************");
    Console.WriteLine($"this is main_{Thread.CurrentThread.ManagedThreadId}");
    Action<string> act = t => { Console.WriteLine(t); }; //声明一个委托,接受一个string类型的参数。这里打印参数"ssss"
    IAsyncResult result = null;
    AsyncCallback asyncCallback = t => //声明一个AsyncCallback 也就是BeginInvoke的第二个参数,代表一个异步回掉函数
    {
    Console.WriteLine($"{string.ReferenceEquals(t, result)}"); //AsyncCallback 是一个委托,IAsyncResult参数就是BeginInvoke的返回值
    Console.WriteLine($"this is asynccallback{Thread.CurrentThread.ManagedThreadId}");
    };
    result = act.BeginInvoke("ssss", asyncCallback, null);
    act.EndInvoke(result);
    //result.AsyncWaitHandle.WaitOne();
    Console.Read();

    Thread和ThreadPool的相关Api及用法

    • Thread 是.net framework 1.X版本的类(没记错的话),Thread类接受一个ThreadStart的委托,这个委托没有参数,没有返回值。我们这里定义一个耗时的测试方法(下文中都用这个方法代表一些费事的逻辑操作。)写一个测试方法,创建五个线程分别执行这个方法。
            private static void DoSomethingLong(string name)
    {
    Console.WriteLine($"*************DoSomethingLong Start {Thread.CurrentThread.ManagedThreadId}*************");
    long Result = 0;
    for(int i = 0; i < 1000000000; i++)
    {
    Result += i;
    }
    Console.WriteLine("*************DoSomethingLong Start {Thread.CurrentThread.ManagedThreadId}*************");
    }
    private void ThreadTest()
    {
    for(int i = 0; i < 5; i++)
    {
    Thread thread = new Thread(() => { DoSomethingLong(""); });
    thread.Start();
    }
    }
    • 使用Thread来编写一个带有回调的方法
            private void ThreadCallBackTest()                      //这执行下
    {
    Thread thread = new Thread(
    () => ThreadCallBack(() => { Console.WriteLine("这是Thread接收的threadStart"); },
    () => Console.WriteLine("这是回调函数")));
    thread.Start();
    }
    private void ThreadCallBack(ThreadStart threadStart,Action act) //这里包装一层,Thread的回调方法放入Action参数中
    {
    ThreadStart start = new ThreadStart(() => //ThreadStart 本身是一个无参数无返回值的委托, 将ThreadStart和Action 都执行下
    {
    threadStart.Invoke(); //这里相当于把原来的ThreadStart(()=>{} )包了一层,里面又一个ThreadStart(()=>{} ),需要体会下
    act.Invoke();
    });
    Thread thread = new Thread(start); //启用一个线程执行。
    thread.Start();
    }
    • 使用Thread写一个带返回值的委托。
           private void ThreadReturnParTest()
    {
    Thread thread = new Thread(() =>
    {
    var e = ThreadReturnPar<string>(() => { return "ssss"; }); //定义ThreadReturnPar的参数,返回一个ssss
    var s = e.Invoke(); //这里等待,thread.Join()
    Console.WriteLine(s);
    });
    thread.Start();
    }
    private Func<T> ThreadReturnPar<T>(Func<T> func) //核心方法,这里接受一个Func<T>
    {
    T t = default(T); //声明一个T
    ThreadStart start = new ThreadStart(() => //启用ThreadStart,给这里的t附上返回值。这里还没有执行,回调的时候才会执行。
    {
    t = func.Invoke();
    });
    Thread thread = new Thread(start); //开启一个新的线程,执行这个ThreadStart
    thread.Start();
    return () =>
    {
    while (thread.ThreadState != System.Threading.ThreadState.Stopped) //判断执行执行这个ThreadStart的委托是否已经执行完,
    { //返回一个委托,哪里需要接受这个返回值,哪里等待thread.Join();
    thread.Join();
    }
    return t;
    };
    }
    • ThreadPool是.net framework 2.X版本的类(没记错的话),线程池线程做一个池化的管理(对应设计模式为享元模式),使用ThreadPool时候不再由.netframework 框架从操作系统层面创建一个新的线程,而是由ThreadPool统一管理,我们向ThreadPool申请一个线程,使用完了以后把这个线程资源归还给ThreadPool
    • ThreadPool的QueueUserWorkItem方法接收一个WaitCallback委托,这个委托的参数就是QueueUserWorkItem的第二个参数。这里还要介绍一个对象,ManualResetEvent对象,可以把这个对象理解为一把锁,这个锁有个初始状态,ManualResetEvent对象的Reset方法阻塞线程,Set方法使阻塞线程继续运行。
            private void ThreadPoolTest()
    {
    Console.WriteLine($"ThreadPoolTest start {DateTime.Now.Millisecond}"); /*第一句打印*/ /*add ManualResetEvent 第一句打印*/
    ManualResetEvent mre = new ManualResetEvent(true); //实例化一个初始状态、理解为一把锁
    mre.Reset();
    ThreadPool.QueueUserWorkItem(
    new WaitCallback(
    t => {
    Console.WriteLine($"this is t {t}"); /*第三句打印*/ /*add ManualResetEvent 第二句打印*/
    Thread.Sleep(1000);
    Console.WriteLine("this is threadpool queue waitcallback"); /*第四句打印*/ /*add ManualResetEvent 第三句打印*/
    mre.Set(); //ManualResetEvent关闭
    }), "lmc");
    mre.WaitOne(); //等待ManualResetEvent打开才执行后面的代码
    Console.WriteLine($"ThreadPoolTest End {DateTime.Now.Millisecond}"); /*第二句打印*/ /*add ManualResetEvent 第四句打印*/
    }

    Task和Parallel的相关Api及用法

    • Task类基于.net framework3.5, Task基于线程池,Task创建方式由两种,Task 可以基于工厂创建;也可以用new Task创建 ;相关Api WaitAll 等待所有Task全部完成了再执行; WaitAny 等待一个Task执行完了之后再执行后面的; (以上两个API会卡主线程); Task.Factory.ContinueWhenAll 以回调形式等待所有任务完成后执行一个委托。Task.Factory.ContinueWhenAny以回调形式等待任意一个任务完成后执行一个委托。四个Task的等待执行的Api需要记好。
            private void TaskTest()
    {
    List<Task> tasklist = new List<Task>();
    Console.WriteLine($"TaskTest start_{Thread.CurrentThread.ManagedThreadId} {DateTime.Now.Millisecond}");
    for (int i = 0; i < 5; i++)
    {
    string name = $"lmc_{i}";
    Task task = Task.Factory.StartNew(() =>
    {
    Thread.Sleep(1000);
    Console.WriteLine($"{name}_{Thread.CurrentThread.ManagedThreadId} " );
    });
    tasklist.Add(task);
    }
    Console.WriteLine("before waitall");
    Task.WaitAll(tasklist.ToArray());//线程等待全部
    Console.WriteLine("after waitall");
    Console.WriteLine("before waitany");
    Task.WaitAny(tasklist.ToArray());//线程等待某一个
    Console.WriteLine("after waitany");
    Task.Factory.ContinueWhenAll(tasklist.ToArray(),
    tlist => { Console.WriteLine($"ContinueWhenAll_{tlist.Count()}_{Thread.CurrentThread.ManagedThreadId}"); //回调形式等待所有任务完成后打印一句ContinueWhenAll_
    Console.WriteLine($"ContinueWhenAll_{Thread.CurrentThread.ManagedThreadId}"); });
    Task.Factory.ContinueWhenAny(tasklist.ToArray(),
    t => { Console.WriteLine($"ContinueWhenAny_{t.Id}_{Thread.CurrentThread.ManagedThreadId}");
    Console.WriteLine($"ContinueWhenAny_{Thread.CurrentThread.ManagedThreadId}"); }); //回调形式等待任意一个任务完成后打印一句ContinueWhenAny
    Console.WriteLine($"TaskTest End_{Thread.CurrentThread.ManagedThreadId} {DateTime.Now.Millisecond}");
    }
    • Parallel并行任务,主线程CPU参与计算,Invoke方法接受一个委托的数组。Parallel的另外两个常用的方法是ForForeach,两个方法类似,这里拿For来举个例子。(仔细观察下就像是Task.WaitAll)
            private void ParallelTest()
    {
    Console.WriteLine($"ParallelTest start_{Thread.CurrentThread.ManagedThreadId} {DateTime.Now.Millisecond}");
    //Parallel
    Parallel.Invoke(
    () => { Console.WriteLine($"ParallelTest_1_{Thread.CurrentThread.ManagedThreadId}"); },
    () => { Console.WriteLine($"ParallelTest_2_{Thread.CurrentThread.ManagedThreadId}"); },
    () => { Console.WriteLine($"ParallelTest_3_{Thread.CurrentThread.ManagedThreadId}"); },
    () => { Console.WriteLine($"ParallelTest_4_{Thread.CurrentThread.ManagedThreadId}"); }
    );
    ParallelOptions parallelOptions = new ParallelOptions(); //创建一个parallelOptions对象
    parallelOptions.MaxDegreeOfParallelism = 3; //设置最大并发任务数量
    Parallel.For(1, 10, parallelOptions,(t,state) => //ParallelLoopState参数 使并行循环迭代与其他迭代交互退出当前线程。
    {
    Thread.Sleep(1000);
    Console.WriteLine($"ParallelTest_{t}_{Thread.CurrentThread.ManagedThreadId}_{DateTime.Now.Millisecond}");
    state.Stop();
    return;
    });
    Console.WriteLine($"ParallelTest end_{Thread.CurrentThread.ManagedThreadId} {DateTime.Now.Millisecond}");
    }

    Async&&Await

    • 在.netframework 4.5 出来以后,我们经常能看到,async和await两个关键字,代表方法的异步执行。使用await关键字必须要在async关键字修饰的方法下。主线程遇到await关键字就立即返回,把剩下的任务交由一个子线程来回调完成。这个过程就像是Task.Factory.ContinueWhenAny,参数就是await 后面的代码组成的一个委托。
            static void Main(string[] args)
    {
    Program prm = new Program();
    Console.WriteLine("1"); //第一句执行
    prm.AsyncTest(); //进入异步方法
    Console.WriteLine("2"); //第三句执行
    }
    private async Task AsyncTest()
    {
    Console.WriteLine($"this is async task Main Start_{Thread.CurrentThread.ManagedThreadId}_{DateTime.Now.Millisecond}"); //第二句执行
    await Task.Factory.StartNew(() => //这里由于遇到了await关键字,所以主线程返回
    {
    Thread.Sleep(3000);
    Console.WriteLine($"this is async task Son Start__{Thread.CurrentThread.ManagedThreadId}_{DateTime.Now.Millisecond}"); //第四句执行
    Console.WriteLine($"this is async task Son End_{Thread.CurrentThread.ManagedThreadId}_{DateTime.Now.Millisecond}"); //第五句执行
    });
    //遇到await关键字主线程返回,剩下的任务由子线程回调完成。
    Console.WriteLine($"this is async task Main End_{Thread.CurrentThread.ManagedThreadId}_{DateTime.Now.Millisecond}"); //第六句执行
    }
    • 最后让我们来看下处理多线程的异常。多线程运行中,子线程执行的任务抛出的异常不会主动影响主线程是其停止,最好的方法就是使用CancellationTokenSource对象。一个线程执行时候,首先判断下CancellationTokenSource对象的标识、假如异常,取消执行。一个线程的终止或异常,由它自身来完成。
            private void TaskInteractive()
    {
    try
    {
    CancellationTokenSource cts = new CancellationTokenSource();
    TaskFactory taskFactory = new TaskFactory();
    List<Task> tasklist = new List<Task>();
    for (int i = 0; i < 40; i++)
    {
    string name = $"this is {i}";
    Action<object> act = t =>
    {
    try
    {
    Thread.Sleep(500);
    if (!cts.IsCancellationRequested) //判断标识是否取消
    {
    if (t.ToString().Equals("this is 11")) //当执行到第11个任务时候,修改标识cts.token,取消后面的任务执行。
    {
    cts.Cancel();
    throw new Exception($"{t}执行失败");
    }
    if (t.ToString().Equals("this is 12"))
    {
    cts.Cancel();
    throw new Exception($"{t}执行失败");
    }
    Console.WriteLine($"{t}执行成功");
    }
    else
    {
    Console.WriteLine($"{t}********执行放弃********");
    cts.Token.ThrowIfCancellationRequested(); //抛出一个AggregateException异常
    }
    }
    catch (Exception ex)
    {
    Console.WriteLine($"this is logging {ex.Message}");
    }
    };
    tasklist.Add(taskFactory.StartNew(act, name,cts.Token));
    }
    Task.WaitAll(tasklist.ToArray());
    }
    catch (AggregateException aex)
    {
    foreach (var ex in aex.InnerExceptions)
    {
    Console.WriteLine(ex.Message);
    }
    }
    }
  • 相关阅读:
    数据系统与分布式(二) 分布式数据系统(复制与分片)
    数据系统和分布式(一)数据系统基础
    可执行文件(ELF)的装载与进程
    HTTPS协议
    后台开发 缓存, 数据库, 高并发等等
    Golang中new和make的区别
    吴恩达:机器学习里面的作业1遇到的
    笔记——操作系统导论:环境配置
    笔记——操作系统导论:第二章
    Games 101 作业1代码解析
  • 原文地址:https://www.cnblogs.com/liumengchen-boke/p/8552611.html
Copyright © 2011-2022 走看看