zoukankan      html  css  js  c++  java
  • 多线程之线程池任务管理通用模板

         已经一年多的时间没有使用多线程了.最近需要做一个异步发邮件的功能,为了给同事写一个例子,想起以前写的爬虫的时候,

     一个带我的人给了我一个通用的模板类, 我觉得那个例子很好,于是我根据自己的理解,写出了如下的代码.

    模型描述

    首先是一个通用的 计数器.包括 当前在运行的线程数Current,执行成功的数量Success,执行失败的数量Error,以及总完成数量Finish.

    为了方便大家理解,我还是做一下简要的说明.大致过程是这样的: 首先有一批任务进来,需要把这些任务用多线程来处理.(我这里使用的是线程池.)

    关于池的概念大家应该埠陌生.比如 数据库的连接池.

       任务一个一个的分配,每个任务都是开启一个子线程单独执行的,一边分配任务,一边执行任务.在不停的分配任务的时候,会出现这样一种情况,

    就是任务已经分配出去很多了,但是执行的不够快,这个时候需要主线程等待一下,过一会再分配新的任务.

       我将上面的描述用一个日常生活中的例子来做说明: 一个小饭店只有10个座位,10点开始营业,到了12点吃饭的时间,外面来了100个客人,

    挨个挨个的进入饭店就餐.

       第一个人进,占位置1个,第二个人进入, 占位置1个, 第三个人进入,占位置1个,

       第四个人进入占位置1个,第五个人进入占位置1个,  然后第一个人打包的走了,有个位置空出来了,

       现在已经使用了 4个位置,至于进来以后坐什么地方,这个不用管.

       第六个人进入占了位置1个,...第九个人进来了,占位置1个,第2个人吃完了,第十个人进来了,第是一个人进来了,第十二个人进来了,

       第三个人吃完了,第十三个人进来了,

       第十四个人想进来,抱歉,已经坐满了,要等待,过1分钟再看有没有位置,如果没有人离开,外面的人一直不能进来,

       过了五分钟, 再看发现一个桌子四个人都离开了,于是第十四个人进来了,第十五个人进来了....后面的类似前面的...

       废话啰嗦完了,上代码

    先上一个计数器读写的类

      public sealed class ThreadCounter
        {
            private int current = 0;
            private int error = 0;
            private int success = 0;
            private int finish = 0;
    
            private static object writeObj = new object();
    
            public int getCurrent()
            {
                lock (writeObj) return current;
            }
    
            public int getError()
            {
                lock (writeObj) return error;
            }
    
            public int getSuccess()
            {
                lock (writeObj) return success;
            }
    
            public int getFinish()
            {
                lock (writeObj) return finish;
            }
    
            public void setAddCurrent()
            {
                lock (writeObj) current++;
            }
    
            public void setSuccess()
            {
                lock (writeObj)
                {
                    success++;
                    current--;
                }
            }
    
            public void setError()
            {
                lock (writeObj)
                {
                    error++;
                    current--;
                }
            }
    
            public void setFinish()
            {
                lock (writeObj) finish++;
            }
        }

    主体运行业务处理类

    新建类 RunManager,定义一个私有字段,一个随机数生成类,一个线程计数类

    1   public class RunManager
    2     {
    3         private static int MaxThreadCount = 10;
    4 
    5         ThreadCounter tc = new ThreadCounter();
    6 
    7         Random random = new Random();
    8     }

    新增模拟任务的初始化方法

    /// <summary>
    /// 最大开启的线程数
    /// </summary>
     /// <param name="threadCount"></param>
    public void RunStart(int threadMaxCount)
    {
         //最大开启的线程数
         Console.WriteLine("设置最大线程数:" + threadMaxCount);
    
          MaxThreadCount = threadMaxCount;
    
          RunTask(100);
    }

    线程池分配任务

     1 public void RunTask(int taskCount)
     2 {
     3     string mss = "";
     4     string[] arr = new string[] {"|","/","-","\" };
     5     int j = 0;
     6     while (true)
     7     {
     8         if (++j==4)
     9             j = 0;
    10         Console.Clear();
    11         Console.SetCursorPosition(1, 0);
    12         Console.WriteLine("当前运行情况");
    13         Console.Write("["+arr[j]+"]	");
    14             mss = string.Format("Query:{0} , Current:{1} , Success:{2} , Error:{3} , Finish:{4}", 
    15                 taskCount, tc.getCurrent(),tc.getSuccess(), tc.getError(), tc.getFinish());
    16         Console.WriteLine(mss);
    17         Thread.Sleep(500);
    18 
    19         if (taskCount>0&&tc.getCurrent() < MaxThreadCount)
    20         {
    21             taskCount--;
    22             tc.setAddCurrent();
    23             ThreadPool.UnsafeQueueUserWorkItem(RuanCallBack, 0);
    24         }
    25         else if (taskCount==0)
    26             taskCount = random.Next(60, 125);
    27         else if (tc.getCurrent() >= MaxThreadCount)
    28             Thread.Sleep(6000);
    29     }
    30}

    模拟线程运行所消耗的时间

     1 public void RuanCallBack(object parms)
     2 {
     3     // 将传过来的参数, 整理为要查询的数据, 以及处理数据,写入文件,打包zip,
     4     // 发送邮件通知用户下载
     5     try
     6     {
     7         int t = random.Next(3, 7);
     8         Thread.Sleep(t * 1000);
     9         tc.setSuccess();
    10     }
    11     catch (Exception)
    12     {
    13         tc.setError();
    14     }
    15     finally
    16     {
    17         tc.setFinish();
    18     }
    19     //Console.WriteLine("当前线程序号:" + parms.ToString() + "	线程停顿时间:" + t);
    20 }

    调用

    static void Main(string[] args)
     {
         RunManager r = new RunManager();
         r.RunStart(20);
         Console.Read();
         Console.WriteLine("运行结束");
     }

    看看运行效果

     Nice!

    原代码下载: 下载

  • 相关阅读:
    Caused by: java.lang.ClassNotFoundException: org.apache.commons.fileupload.RequestContext
    Caused by: java.lang.ClassNotFoundException: org.apache.commons.lang3.StringUtils
    Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.FileUtils
    Caused by: java.lang.ClassNotFoundException: javassist.ClassPool
    Caused by: java.lang.ClassNotFoundException: ognl.PropertyAccessor
    利用DBLINK同步表数据库--老刘
    100万并发连接服务器笔记之1M并发连接目标达成
    模拟row cache lock
    redis读写性能测试
    Adobe RIA 开发工程师认证考试大纲
  • 原文地址:https://www.cnblogs.com/mjxxsc/p/4563048.html
Copyright © 2011-2022 走看看