zoukankan      html  css  js  c++  java
  • 任务池管理与执行器

    1 前言

          一个后台实时处理的业务平台,通常我们会根据数据的输入与输出,依据时间轴进行分解成不同阶段或不同粒度的逻辑任务,而每一个待处理的数据我们称为任务或者消息。任务之间的关系可以分为两类:a 上下游父子关系,b 可以并行运行的兄弟关系。具有上下游关系的任务集合具有逻辑或数据依赖关系,即上游任务执行完后,才能执行下游任务;具有兄弟关系的任务间逻辑上互不影响,可以并行运行。

          无论是上面任一情况的业务场景,我们都需要一种管理类,其职责:管理着一堆线程及其待执行的同类型任务集合。线程会等待去执行喂给它的任务,当任务集合大于线程集合的个数时,任务会在队列排队等待;而当线程集合个数大于任务集合时,线程会挂起处于阻塞等待状态,执行器也相应地处于不饱和状态。在jdk里面有现成的管理类ThreadPoolExecutor,那么在c++里面看看类似的实现吧: 

    2 任务与任务池

    2.1任务

     无论是消息或业务数据,可以抽象地表达为:

          struct data_pair
          {
    
             char *data;
    
             int len;
    
          }

    2.2 任务池

         任务的缓存用队列表达:

         std::queue<data_pair*> _queue; 

    2.3 任务提交入口

      int CQueueThread::writeData(void *data, int len)
    
        {
    
            if (data == NULL || len <= 0) {
                return EXIT_FAILURE;
            }
    
            data_pair *item = new data_pair();
            item->data = (char*) malloc(len);
            assert(item->data != NULL);
            memcpy(item->data, data, len);
            item->len = len;       
            _mutex.lock();
            _queue.push(item);
            _mutex.signal();
    
            _mutex.unlock();
    
            return EXIT_SUCCESS;
        }

    3线程池   

    3.1 线程封装

        c++里面类似jdk里面Thread类的封装CThread          

    {
    
    class CThread {
    
     
    
    public:
    
        /**
    
         * 构造函数
    
         */
    
        CThread() {
    
            tid = 0;
    
            pid = 0;
    
        }
    
     
    
        /**
    
         * 起一个线程,开始运行
    
         */
    
        bool start(Runnable *r, void *a) {
    
            runnable = r;
    
            args = a;
    
            return 0 == pthread_create(&tid, NULL, CThread::hook, this);
    
        }
    
     
    
        /**
    
         * 等待线程退出
    
         */
    
        void join() {
    
            if (tid) {
    
                pthread_join(tid, NULL);
    
                tid = 0;
    
                pid = 0;
    
            }
    
        }
    
     
    
        /**
    
         * 得到Runnable对象
    
         *
    
         * @return Runnable
    
         */
    
        Runnable *getRunnable() {
    
            return runnable;
    
        }
    
     
    
        /**
    
         * 得到回调参数
    
         *
    
         * @return args
    
         */
    
        void *getArgs() {
    
            return args;
    
        }
    
       
    
        /***
    
         * 得到线程的进程ID
    
         */
    
        int getpid() {
    
            return pid;
    
        }
    
     
    
        /**
    
         * 线程的回调函数
    
         *
    
         */
    
     
    
        static void *hook(void *arg) {
    
            CThread *thread = (CThread*) arg;
    
            thread->pid = gettid();
    
     
    
            if (thread->getRunnable()) {
    
                thread->getRunnable()->run(thread, thread->getArgs());
    
            }
    
     
    
            return (void*) NULL;
    
        }
    
       
    
    private:   
    
        /**
    
         * 得到tid号
    
         */
    
        #ifdef _syscall0
    
        static _syscall0(pid_t,gettid)
    
        #else
    
        static pid_t gettid() { return static_cast<pid_t>(syscall(__NR_gettid));}
    
        #endif
    
     
    
    private:
    
        pthread_t tid;      // pthread_self() id
    
        int pid;            // 线程的进程ID
    
        Runnable *runnable;
    
        void *args;
    
    }; 
    
    }
    View Code

    3.2 线程池

        并行处理的能力有线程池的个数决定,定义如下:

     CThread *_thread;

     int _threadCount;    

     4 执行器 

      4.1 执行启动

    int CDefaultRunnable::start() {
        if (_thread != NULL || _threadCount < 1) {
            TBSYS_LOG(ERROR, "start failure, _thread: %p, threadCount: %d", _thread, _threadCount);
            return 0;
        }
    
        _thread = new CThread[_threadCount];
        if (NULL == _thread)
        {
            TBSYS_LOG(ERROR, "create _thread object failed, threadCount: %d", _threadCount);
            return 0;
        }
    
        int i = 0;
        for (; i<_threadCount; i++)
        {
            if (!_thread[i].start(this, (void*)((long)i)))
            {
              return i;
            }
        }
    
      return i;
    }

     4.2 执行

         执行器包含了具体业务的执行:

        void CQueueThread::run(CThread *thread, void *args)
        {
            int threadIndex = (int)((long)(args));
            _mutex.lock();
            while(!_stop) {
                while(_stop == 0 && _queue.empty()) {
                    _mutex.wait();
                }
                if (_stop) {
                    break;
                }
    
                data_pair *item = _queue.front();
                _queue.pop();
                _mutex.unlock();
                if (item != NULL) {
                    if (_handler) {
                        _handler->handleQueue(item->data, item->len, threadIndex, _args);
                    }
    
                    if (item->data) {
                        free(item->data);
                    }
                    free(item);
                }
                _mutex.lock();
            }
    
            _mutex.unlock();   

     5 样例代码 

        CMyHandler handler;
        CQueueThread queueThread(3, &handler, NULL);
        queueThread.start();
        char data[1024];
    for(int i=1; i<=mWriteCount; i++) { int len = sprintf(data, "data_%05d", i); queueThread.writeData(data, len+1); } queueThread.wait();

     

    参考

          http://code.taobao.org/p/tfs/src/

  • 相关阅读:
    如何复制百度文库中的文章转的,不用担心下载要币了[转]
    什么是中间件(转)
    android实用代码 (转)
    [Java]读取文件方法大全(转)
    Solaris下查看磁盘、内存、CPU使用程度
    Gene Ontology (GO) 简介
    如何在网上查某个基因的转录因子及启动子
    kmeans k均值聚类的弱点/缺点
    层次聚类
    什么是非负矩阵分解 NMF(Nonnegative Matrix Factorization )
  • 原文地址:https://www.cnblogs.com/gisorange/p/4891163.html
Copyright © 2011-2022 走看看