zoukankan      html  css  js  c++  java
  • c#:ThreadPool实现并行分析,并实现线程同步结束

    • 背景:

    一般情况下,经常会遇到一个单线程程序时执行对CPU,MEMORY,IO利用率上不来,且速度慢下问题;那么,怎么解决这些问题呢?

    据我个人经验来说有以下两种方式:

    1、并行、多线程(Parallel、Task、ThreadPool)

    2、多进程MultipleProcess

    恰好工作中又一次遇到单线程程序性能低的问题,本次我主要想尝试使用ThreadPool来实现多线程,并且在实现多线程任务同步结束。

    • ThreadPool线程同步结束示例一:

    一个ManualResetEvent结合Interlocked来实现线程同步结束。

     1  static void Main(string[] args)
     2         {
     3             using (ManualResetEvent finish = new ManualResetEvent(false))
     4             {
     5                 int maxThreadCount = 100;
     6                 for (var i = 0; i < 100; i++) {
     7                     ThreadPool.QueueUserWorkItem((Object state)=> {
     8                         Console.WriteLine("task:{0}",state);
     9 
    10                         // 以原子操作的形式递减指定变量的值并存储结果。
    11                         if (Interlocked.Decrement(ref maxThreadCount) == 0) {
    12                             // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。
    13                             finish.Set();
    14                         }                        
    15                     }, i);
    16                 }
    17 
    18                 // 阻止当前线程,直到当前 System.Threading.WaitHandle 收到信号。
    19                 finish.WaitOne();
    20             }
    21 
    22             Console.WriteLine("Complete!");
    23             Console.ReadKey();

    上边的代码是可行性,当系统线程数超过系统允许最大数时,线程会被在线程池中排队等待。

    • ThreadPool线程同步结束示例二: 

    ManualResetEvent集合(每一个线程由集合中的唯一一个ManualResetEvent对象来实现线程的同步跟踪)结合WaitHandle.WaitAll(ManualResetEvent集合)来实现线程同步结束。

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace ThreadPoolTest
    {
        class MyTask
        {
            private ManualResetEvent finish = null;
            public MyTask(ManualResetEvent finish)
            {
                this.finish = finish;
            }
    
            public void MyTaskThreadPoolCallback(Object state)
            {
                Console.WriteLine("task:{0}", state);
    
                // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。
                this.finish.Set();
            }
        }
    
    
        class Program
        {
            static void Main(string[] args)
            {
                const int maxThreadCount = 64;
                ManualResetEvent[] finishItems = new ManualResetEvent[maxThreadCount];
                MyTask[] myTaskItems = new MyTask[maxThreadCount]
                    ;
                for (var i = 0; i < maxThreadCount; i++)
                {
                    finishItems[i] = new ManualResetEvent(false);
    
                    MyTask myTask = new MyTask(finishItems[i]);
                    myTaskItems[i] = myTask;
    
                    ThreadPool.QueueUserWorkItem(myTask.MyTaskThreadPoolCallback, i);
                }
    
                // 等待指定数组中的所有元素都收到信号。
                WaitHandle.WaitAll(finishItems);
    
                Console.WriteLine("Complete!");
                Console.ReadKey();
            }
    
    
        }
    }

    尽管这种想法不错,但是存在一些问题:比如ManualResetEvent集合数量不允许超过系统允许的最大数量,我的计算机系统允许的最大数量是64,当我把配置超过64时(const int maxThreadCount = 65;),就会抛出异常。

    • 实现多线程时,需要注意事项:

    可是一般情况下遇到这种业务的情况下,只要修改多线程,必然会遇到某个对象不允许被多个线程操作的问题。

    比如:

    1、多个线程同时向一个文件中写入内容,这种情况一般使用锁来包成被访问对象的安全性。比如:互斥锁(lock、Mutex)、读写锁(ReadWriteLock)、Monitor、Semaphore(信号灯)、Interlocked(内存共享)等。

    2、多个线程同时修改一个非线程安全集合对象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)时,往往会抛出异常。针对这种情况,需要使用命名空间System.Collections.Concurrent.*下支持线程安全的集合、字典、队列、栈等对象来替代。

    • 业务场景:

    我们需要对一个多行文本文件进行解析,根据具体地址解析其中的经纬度信息。如果解析过程中解析失败的行,需要记录到一个_error.txt;解析成功的记录行,记录到_result.txt。使用单线程分析过程中已经遇到了性能低问题,需求解决方案是使用ThreadPool技术。

    • 业务实现:
      1         private static int maxThreadCount = 0;
      2         private static int fakeMaxThreadCount = int.MaxValue;
      3         private static ManualResetEvent finish = new ManualResetEvent(false);
      4         private static object errorLocker = new object();
      5         private static object resultLocker = new object();
      6         private static object maxThreadCountLcker = new object();
      7 
      8         public void ParserFile(string filePath)
      9         {
     10             using (StreamWriter writerError = new StreamWriter(filePath + "_error"))
     11             {
     12                 using (StreamWriter writerResult = new StreamWriter(filePath + "_result"))
     13                 {
     14                     finish = new ManualResetEvent(false);
     15                     using (StreamReader reader = new StreamReader(filePath))
     16                     {
     17                         string line = reader.ReadLine();
     18                         while (line != null)
     19                         {
     20                             maxThreadCount++;
     21                             ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult
     22 });
     23 
     24                             line = reader.ReadLine();
     25                         }
     26                     }
     27 
     28                     maxThreadCount++;
     29                     lock (maxThreadCountLcker)
     30                     {
     31                         fakeMaxThreadCount = maxThreadCount;
     32                     }
     33 
     34                     ThreadPool.QueueUserWorkItem(DoWork, new object[] { });
     35 
     36                     finish.WaitOne();
     37                     finish.Close();
     38                     finish.Dispose();
     39                 }
     40             }
     41         }
     42 
     43 
     44 
     45         private void DoWork(object state)
     46         {
     47             object[] objectItem = state as object[];
     48             if (objectItem.Length != 3)
     49             {
     50                 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
     51                 {
     52                     finish.Set();
     53                 }
     54                 return;
     55             }
     56             string line = objectItem[0].ToString();
     57             StreamWriter writerError = objectItem[1] as StreamWriter;
     58             StreamWriter writerResult = objectItem[2] as StreamWriter;
     59 
     60             try
     61             {
     62                 string[] fields = line.Split(new char[] { '|' });
     63 
     64                 string imsi = fields[0];
     65                 string city = fields[1];
     66                 string county = fields[2];
     67                 string address = fields[3];
     68 
     69                 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**区**路23号
     70                 string uri = " http://restapi.amap.com/v3/geocode/geo";
     71                 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名称)", address);
     72 
     73                 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**区***路|23号","province":"***","citycode":"***","city":"***市","district":"***区","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"门牌号"}]}
     74                 string result = GetRequesetContext(uri, parameter);
     75                 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1)
     76                 {
     77                     lock (errorLocker)
     78                     {
     79                         writerError.WriteLine(result);
     80                     }
     81                 }
     82                 else
     83                 {
     84                     int indexCount = 0;
     85                     List<string> lnglatItems = new List<string>();
     86                     foreach (string resultItem in result.Split(new string[] { "","", ","" }, StringSplitOptions.RemoveEmptyEntries))
     87                     {
     88                         if (resultItem.IndexOf("location") != -1)
     89                         {
     90                             indexCount++;
     91                             lnglatItems.Add(resultItem.Split(new char[] { ':' })[1].Replace(""", string.Empty));
     92                         }
     93                     }
     94                     if (indexCount == 1)
     95                     {
     96                         lock (resultLocker)
     97                         {
     98                             writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi);
     99                         }
    100                     }
    101                     else
    102                     {
    103                         lock (resultLocker)
    104                         {                            
    105                             writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi);
    106                         }
    107                     }
    108                 }
    109             }
    110             catch (Exception ex)
    111             {
    112                 logger.Error("{0}
    {1}", ex.Message, ex.StackTrace);
    113                 lock (errorLocker)
    114                 {
    115                     writerError.WriteLine(line);
    116                 }
    117             }
    118             finally
    119             {
    120                 lock (maxThreadCountLcker)
    121                 {
    122                     if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
    123                     {
    124                         finish.Set();
    125                     }
    126                 }
    127             }
    128         }

     备注:

    关于ThreadPool线程池内最大线程控制函数:SetMaxThreads 设置可以同时处于活动状态的线程池的请求数目。 所有大于此数目的请求将保持排队状态,直到线程池线程变为可用。

    [SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
    public static bool SetMaxThreads(
        int workerThreads,
        int completionPortThreads
    )

    workerThreads:线程池中辅助线程的最大数目。

    completionPortThreads:线程池中异步 I/O 线程的最大数目。

    但是,需要注意事项:

    不能将辅助线程的数目或 I/O 完成线程的数目设置为小于计算机的处理器数目。

    如果承载了公共语言运行时,例如由 Internet 信息服务 (IIS) 或 SQL Server 承载,主机可能会限制或禁止更改线程池大小。

    更改线程池中的最大线程数时需谨慎。 虽然这类更改可能对您的代码有益,但对您使用的代码库可能会有不利的影响。

    将线程池大小设置得太大可能导致性能问题。 如果同时执行的线程太多,任务切换开销就成为影响性能的一个主要因素。

  • 相关阅读:
    6-Python爬虫-分布式爬虫/Redis
    ES 查询时 排序报错(fielddata is disabled on text fileds by default ... )解决方法
    Intellij Idea webstorm 激活
    Intellij Idea 配置jdk
    java 获取(格式化)日期格式
    js 跳转 XSS漏洞 预防
    CSS去掉背景颜色
    js对象无法当成参数传递 解决方法
    Elasticsearch java api
    java多条件查询SQL语句拼接的小技巧
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/6503123.html
Copyright © 2011-2022 走看看