zoukankan      html  css  js  c++  java
  • 任务时间较长,且单个任务执行时间不一致的情况下多线程分配

    对于开发人员来说遇到多线程问题是十分常见的,一般处理方式是简单粗暴直接开N的线程然后把任务分为N组。这种处理方式有一个明显的弊端就是每个分组执行时间不一定一致,这样会导致线程空闲的状态。

    下文中很多地方提到线程不是指技术上的而是逻辑上的。

    第一步:

    在构造器中初始化信息:

    _reqsBusy:忙碌信息,_workingSignals:工作线程信息,_reqCount:线程数,list:待执行列表,cList:执行完成列表

     public class ExtensionTaskFactory<T>
    {
            private bool[] _reqsBusy = null;
            private bool[] _workingSignals = null;
            private int _reqCount = 4;
            private List<T> list;
            private List<T> cList = new List<T>();
            private Timer _checkTimer;
            public bool IsFinish;
            private DateTime _createTime;
            public ExtensionTaskFactory(List<T> l)
            {
                list = l;
                _reqsBusy = new bool[_reqCount];
                _workingSignals = new bool[_reqCount];
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    _workingSignals[i] = true;
                }
                _createTime = DateTime.Now;
            }
            public ExtensionTaskFactory(List<T> l, int count)
            {
                list = l;
                _reqCount = count;
                _reqsBusy = new bool[_reqCount];
                _workingSignals = new bool[_reqCount];
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    _workingSignals[i] = true;
                }
                _createTime = DateTime.Now;
            }
    }    

     第二步:

    开始任务(Start())-> 队空闲线程派发任务(DispatchWork()) -> 任务派发后修改当前线程状态,并且执行相关任务逻辑 (Process(int i)) -> 任务执行完成后再次请求派发任务

           private async void Process(int i)
            {
                _reqsBusy[i] = true;
                #region Adjust queue
                if (list.Count() > 0)
                {
                    Console.WriteLine("Remaining {0} data need to be executed.", list.Count());
                    T model = list.FirstOrDefault();
                    list.Remove(model);
                    cList.Add(model);
                    await Dotaskfunction(model);
                }
                else
                {
                    _workingSignals[i] = false;
                    _reqsBusy[i] = false;
                    return;
                }
                #endregion
                _reqsBusy[i] = false;
                DispatchWork();
            }
            private void DispatchWork()
            {
                for (int i = 0; i < _reqCount; i++)
                {
                    if (!_reqsBusy[i])
                    {
                        Process(i);
                    }
    
                }
            }
    
            private async Task Dotaskfunction(T model)
            {
                await Task.Run(() =>
                {
                    //Do someThing
                });
            }
    
            public void Start()
            {
                DispatchWork();
            }

    第三步:将任务逻辑以事件形式注入到 Dotaskfunction 中

    1. 定义事件
      public event TaskEventHandler<T> TaskEvent;
    2. 修改 Dotaskfunction
      private async Task Dotaskfunction(T model)
              {
                  await Task.Run(() =>
                  {
                      //Console.WriteLine("task {0}  has been started!", s);
                      //Do someThing
                      if (TaskEvent != null)
                      {
                          TaskEvent(model, _createTime);
                      }
                      //Thread.Sleep(3000);
                      //Console.WriteLine("task {0}  has been done!", s);
                  });
              }

    第四步:

    监测任务完成情况

    1. 监测任务线程是否都已经闲置
      private void CheckFinish(object param)
              {
                  if (IsFinished())
                  {
                      _checkTimer.Dispose(); //Stop Timer
                      _checkTimer = null;
                      IsFinish = true;
                      Console.WriteLine("Async Task End !");
                      Console.WriteLine("-------------------" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "-------------------");
                  }
              }
              private bool IsFinished()
              {
                  bool result = false;
                  for (int i = 0; i < _workingSignals.Count(); i++)
                  {
                      if (_workingSignals[i])
                      {
                          break;
                      }
                      if (i == _workingSignals.Count() - 1)
                      {
                          result = true;
                      }
                  }
                  return result;
              }
    2. 修改 Start() 方法,注册Timer定时检测任务是否完成
      public void Start()
              {
                  _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
                  DispatchWork();
              }

    第五步:辅助功能

    1. 添加任务
      public void AddList(List<T> addlist)
              {
                  list.AddRange(addlist);
              }
    2. 重新启动任务
      public void Restart()
              {
                  if (_checkTimer == null)
                  {
                      _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
                  }
                  for (int i = 0; i < _workingSignals.Count(); i++)
                  {
                      _workingSignals[i] = true;
                  }
                  DispatchWork();
              }
    3. 获取已完成任务
      public List<T> GetResult()
              {
                  return cList;
              }

    第六步:调用示例

    1. 调用
      List<Test> tasks;       
      ExtensionTaskFactory<Test> task = new ExtensionTaskFactory<Test>(tasks, 4); task.TaskEvent += Task_RunQueryEvent; task.Start();
    2. 注册任务处理逻辑事件
      private void Task_RunQueryEvent(RunQueryLog log, DateTime time)
      {
          //Do something
      }

    最后附上源码

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace NewsletterAlert.Core.Common
    {
        public delegate void TaskEventHandler<T>(T model, DateTime time);
        public class ExtensionTaskFactory<T>
        {
            private bool[] _reqsBusy = null;
            private bool[] _workingSignals = null;
            private int _reqCount = 4;
            private List<T> list;
            private List<T> cList = new List<T>();
            private Timer _checkTimer;
            public bool IsFinish;
            private DateTime _createTime;
    
            public event TaskEventHandler<T> TaskEvent;
    
            public ExtensionTaskFactory(List<T> l)
            {
                list = l;
                _reqsBusy = new bool[_reqCount];
                _workingSignals = new bool[_reqCount];
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    _workingSignals[i] = true;
                }
                _createTime = DateTime.Now;
            }
            public ExtensionTaskFactory(List<T> l, int count)
            {
                list = l;
                _reqCount = count;
                _reqsBusy = new bool[_reqCount];
                _workingSignals = new bool[_reqCount];
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    _workingSignals[i] = true;
                }
                _createTime = DateTime.Now;
            }
    
            private async void Process(int i)
            {
                _reqsBusy[i] = true;
                #region Adjust queue
                if (list.Count() > 0)
                {
                    Console.WriteLine("Remaining {0} data need to be executed.", list.Count());
                    T model = list.FirstOrDefault();
                    list.Remove(model);
                    cList.Add(model);
                    await Dotaskfunction(model);
                }
                else
                {
                    _workingSignals[i] = false;
                    _reqsBusy[i] = false;
                    return;
                }
                #endregion
                _reqsBusy[i] = false;
                DispatchWork();
            }
            private void DispatchWork()
            {
                for (int i = 0; i < _reqCount; i++)
                {
                    if (!_reqsBusy[i])
                    {
                        Process(i);
                    }
    
                }
            }
    
            private async Task Dotaskfunction(T model)
            {
                await Task.Run(() =>
                {
                    //Console.WriteLine("task {0}  has been started!", s);
                    //Do someThing
                    if (TaskEvent != null)
                    {
                        TaskEvent(model, _createTime);
                    }
                    //Thread.Sleep(3000);
                    //Console.WriteLine("task {0}  has been done!", s);
                });
            }
    
            public void Start()
            {
                _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
                DispatchWork();
            }
    
            private void CheckFinish(object param)
            {
                if (IsFinished())
                {
                    _checkTimer.Dispose(); //Stop Timer
                    _checkTimer = null;
                    IsFinish = true;
                    Console.WriteLine("Async Task End !");
                    Console.WriteLine("-------------------" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "-------------------");
                }
            }
            private bool IsFinished()
            {
                bool result = false;
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    if (_workingSignals[i])
                    {
                        break;
                    }
                    if (i == _workingSignals.Count() - 1)
                    {
                        result = true;
                    }
                }
                return result;
            }
    
            public void AddList(List<T> addlist)
            {
                list.AddRange(addlist);
            }
            public void Restart()
            {
                if (_checkTimer == null)
                {
                    _checkTimer = new Timer(new TimerCallback(CheckFinish), null, 0, 3000);
                }
                for (int i = 0; i < _workingSignals.Count(); i++)
                {
                    _workingSignals[i] = true;
                }
                DispatchWork();
            }
    
            public List<T> GetResult()
            {
                return cList;
            }
        }
    }
  • 相关阅读:
    2018-08-25多线程Thread类+Runnable接口+线程的6种状态
    2018-08-24Properties类+序列化+反序列化+FileUtils+FilenameUtils
    2018-08-22字节字符转换流InputStreamReader+OutputStreamWriter+缓冲流Buffered+newLine换行方法
    2018-08-21文件字节输出流OutputStream+文件字节输入流InputStream+字符输出流FileReader+字符输出流FileWriter
    2018-08-20内容IO流中的File类+文件过滤器FileFilter+递归
    List接口方法、LinkedList方法、Vector集合、Set接口下HashSet、LinkedHashSet集合、HashCode()+equals()方法对于Set接口判断重复的详细细节
    集合之Collection接口AND Iterator迭代器 AND 增强for AND 泛型
    面向对象测试题
    基本类型包装类之system类
    Date
  • 原文地址:https://www.cnblogs.com/yzxblog/p/11057306.html
Copyright © 2011-2022 走看看