zoukankan      html  css  js  c++  java
  • 一种处理多任务的线程池设计

    先上类图:


    1. Worker为处理任务的线程,不断获取新任务运行

    2.Task为任务实例,可以设计为一次性的或者循环运行

    3. ExecutorPool维护着线程池和任务池。

      3.1 内部负责worker的生成和销毁,task的分发;对外提供task的增加。

      3.2 mutex:为互斥变量,实现对tasks和workers的互斥操作。

      3.3 execute:是对外的唯一接口,负责增加任务。

      3.4 getTask:负责向Worker提供任务,从tasks队列中取。如果没有则等待一定时间,依然没有则返回NULL。

      3.5 addWorker:当前线程数不能满足对任务的处理,造成任务阻塞过多,则新起worker。

    再上代码:

    Worker

    void Worker::run() {
      Task task = firstTask;
      firstTask = NULL;
      while (task || (task = pool->getTask())) {
        task->run()
        task = NULL;
      }
      pool->workerDone(this);
    }

    注:不断的从pool中获取任务,获取不到则销毁该线程。


    Task

    void Task::run(){
      ...
    }
    void Task::run(){
      while(true) {
         sleep(n);
         ...
      }
    }


    ExecutorPool

    private Worker ExecutorPool::addWorker(const Task& firstTask) {
      Worker w = new Worker(this, firstTask);
      workers.insert(w);
      return w;
    }

    注:Worker中的pool是对其所在线程池的引用

    Task ExecutorPool::getTask() {
      Task t;
      while (true) {
        {
          IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
          if (!tasks.empty()) {
            t = tasks.front();
            task.pop_front();
          } else {
            if (waitforNewTask()) {
              continue;
            }
          }
        }
        if (t) {
          return t;
        }
        return NULL;
      }
    }

    注:队列中有任务,则返回最早进来的任务。没有,则等待一段时间,等待成功返回新任务;否则返回NULL

    void ExecutorPool::execute(const Task& command) {
      if (!command) {
        return;
      }
      if (tasks.size() < workers.size()) {
        IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mutex);
        tasks.push_back(command);
        if (_waitPoolSize > 0) {
          mutex.notify();
        }
      } else {
        IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mutex);
        Worker w = addWorker(command);
        w->start();
      }
    }
    bool ExecutorPool::waitforNewTask() {
      ++_waitPoolSize;
      bool gotit = mutex.timedWait(n);
      --_waitPoolSize;
      return gotit;
    }

    注:等待新任务,如果有任务加入,调用notify通知,则返回true;如果是等待超时,则返回false。Worker获得false,则会销毁该线程。

  • 相关阅读:
    HDU 3697贪心
    HDU 3226 背包
    numpy_2nd 新建矩阵的五种方法 array zeros empty arange().reshape()
    numpy_1st 属性 ndim,shape,size
    CV学习笔记第二课(上)
    33. 搜索旋转排序数组 二分法
    35. 搜索插入位置 今天就是二分法专场
    34.在排序数组中查找元素的第一个和最后一个位置 二分法
    CV第三课
    CV第二课(下)
  • 原文地址:https://www.cnblogs.com/whuqin/p/4982031.html
Copyright © 2011-2022 走看看