zoukankan      html  css  js  c++  java
  • Linux C++线程池实例

    想做一个多线程服务器测试程序,因此参考了github的一些实例,然后自己动手写了类似的代码来加深理解。

    目前了解的线程池实现有2种思路:

    第一种:

    主进程创建一定数量的线程,并将其全部挂起,此时线程状态为idle,并将running态计数为0,等到任务可以执行了,就唤醒线程,此时线程状态为running,计数增加,如果计数达到最大线程数,就再创建一组空闲线程,等待新任务,上一组线程执行完退出,如此交替。

    第二种:

    采用生成者-消费者模式,主进程作为生成者,创建FIFO队列,在任务队列尾部添加任务,线程池作为消费者在队列头部取走任务执行,这之间有人会提到无锁环形队列,在单生成者单消费者的模式下是有效的,但是线程池肯定是多消费者同时去队列取任务,环形队列会造成挂死。

    我的实例采用第二种方式实现,在某些应用场景下,允许一定时间内,任务排队的情况,重复利用已有线程会比较合适。

    代码比较占篇幅,因此折叠在下面。

     task.h:

     1 #ifndef TASK_H
     2 #define TASK_H
     3 
     4 #include <list>
     5 #include <pthread.h>
     6 
     7 using std::list;
     8 
     9 struct task {
    10    void (*function) (void *);
    11    void *arguments;
    12    int id;
    13 };
    14 
    15 struct work_queue {
    16    work_queue(){
    17         pthread_mutex_init(&queue_lock, NULL);
    18         pthread_mutex_init(&queue_read_lock, NULL);
    19         pthread_cond_init(&queue_read_cond, NULL);
    20         qlen = 0;
    21    }
    22 
    23    ~work_queue() {
    24        queue.clear();
    25        pthread_mutex_destroy(&queue_read_lock);
    26        pthread_mutex_destroy(&queue_lock);
    27        pthread_cond_destroy(&queue_read_cond);
    28    }
    29 
    30    void push(task *tsk);
    31    task *pull();
    32    void post();
    33    void wait();
    34 
    35 private:
    36    int qlen;
    37    list< task * > queue;
    38    pthread_mutex_t queue_lock;
    39    pthread_mutex_t queue_read_lock;
    40    pthread_cond_t  queue_read_cond;
    41 };
    42 
    43 #endif
    View Code

     task.cpp

    #include "task.h"
    
    void work_queue::push(task *tsk) {
       pthread_mutex_lock(&queue_lock);
       queue.push_back(tsk);
       qlen++;
    
       pthread_cond_signal(&queue_read_cond);
       pthread_mutex_unlock(&queue_lock);
    }
    
    task* work_queue::pull() {
        wait();
    
        pthread_mutex_lock(&queue_lock);
        task* tsk = NULL;
        if (qlen > 0) {
            tsk = *(queue.begin());
            queue.pop_front();
            qlen--;
    
            if (qlen > 0)
                pthread_cond_signal(&queue_read_cond);
        }
        pthread_mutex_unlock(&queue_lock);
        return tsk;
    }
    
    void work_queue::post() {
        pthread_mutex_lock(&queue_read_lock);
        pthread_cond_broadcast(&queue_read_cond);
        pthread_mutex_unlock(&queue_read_lock);
    }
    
    void work_queue::wait() {
        pthread_mutex_lock(&queue_read_lock);
        pthread_cond_wait(&queue_read_cond, &queue_read_lock);
        pthread_mutex_unlock(&queue_read_lock);
    }
    View Code

    threadpool.h

     1 #ifndef THREAD_POOL_H
     2 #define THREAD_POOL_H
     3 
     4 #include "task.h"
     5 #include <vector>
     6 
     7 using std::vector;
     8 
     9 #define safe_delete(p)  if (p) { delete p;  p = NULL; }
    10 
    11 struct threadpool {
    12     threadpool(int size) : pool_size(size)
    13                          , thread_list(size, pthread_t(0))
    14                          , queue(NULL)
    15                          , finish(false)
    16                          , ready(0) {
    17 
    18         pthread_mutex_init(&pool_lock, NULL);
    19     }
    20 
    21     ~threadpool() {
    22         thread_list.clear();
    23         safe_delete(queue);
    24         pthread_mutex_destroy(&pool_lock);
    25     }
    26 
    27     void init();
    28     void destroy();
    29     static void* thread_run(void *tp);
    30 
    31     void incr_ready();
    32     void decr_ready();
    33     bool close() const;
    34     
    35     work_queue *queue;
    36     
    37 private:
    38     int pool_size;
    39     int ready;
    40     bool finish;
    41     pthread_mutex_t pool_lock;
    42     vector <pthread_t> thread_list;
    43 };
    44 
    45 
    46 #endif
    View Code

    threadpool.cpp

     1 /*
     2  * threadpool.cpp
     3  *
     4  *  Created on: 2017年3月27日
     5  *      Author: Administrator
     6  */
     7 
     8 #include "threadpool.h"
     9 
    10 
    11 void* threadpool::thread_run(void *tp) {
    12     threadpool *pool = (threadpool *) tp;
    13     pool->incr_ready();
    14 
    15     while(1) {
    16         task* tsk = pool->queue->pull();
    17         if (tsk) {
    18             (tsk->function)(tsk->arguments);
    19             delete tsk;
    20             tsk = NULL;
    21         }
    22 
    23         if (pool->close())
    24             break;
    25     }
    26 
    27     pool->decr_ready();
    28 
    29     return NULL;
    30 }
    31 
    32 void threadpool::incr_ready() {
    33     pthread_mutex_lock(&pool_lock);
    34     ready++;
    35     pthread_mutex_unlock(&pool_lock);
    36 }
    37 
    38 void threadpool::decr_ready() {
    39     pthread_mutex_lock(&pool_lock);
    40     ready--;
    41     pthread_mutex_unlock(&pool_lock);
    42 }
    43 
    44 bool threadpool::close() const {
    45     return finish;
    46 }
    47 
    48 void threadpool::init() {
    49     queue = new work_queue;
    50     if (!queue) {
    51         return;
    52     }
    53 
    54     for(int i; i<pool_size; i++) {
    55         pthread_create(&thread_list[i], NULL, threadpool::thread_run, (void *)this);
    56     }
    57 
    58     while(ready != pool_size) {}
    59 }
    60 
    61 void threadpool::destroy() {
    62     finish = true;
    63 
    64     while(ready) {
    65         if(queue) {
    66             queue->post();
    67         }
    68     }
    69 }
    View Code

    main.cpp

     1 //============================================================================
     2 // Name        : thread_pool.cpp
     3 // Author      : dancy
     4 // Version     :
     5 // Copyright   : Your copyright notice
     6 // Description : Hello World in C++, Ansi-style
     7 //============================================================================
     8 
     9 #include <iostream>
    10 #include "threadpool.h"
    11 #include <pthread.h>
    12 #include <stdio.h>
    13 #include <stdlib.h>
    14 
    15 using namespace std;
    16 
    17 void job(void *tsk){
    18     printf("job %-2d working on Thread #%u
    ", ((task *)tsk)->id, (int)pthread_self());
    19 }
    20 
    21 task *make_task(void (*func) (void *), int id) {
    22     task *tsk = new task;
    23     if (!tsk)
    24         return NULL;
    25     
    26     tsk->function = func;
    27     tsk->arguments = (void *)tsk;
    28     tsk->id = id;
    29     
    30     return tsk;
    31 }
    32 
    33 int main() {
    34     threadpool tp(4);
    35     tp.init();
    36 
    37     for(int i=0; i<40; i++) 
    38         tp.queue->push(make_task(&job, i+1));
    39     
    40     tp.destroy();
    41     printf("all task has completed
    ");
    42     return 0;
    43 }
    View Code

    以上代码需要在linux下编译,mingw下封装的pthread_t,多了一个void *指针,如果要适配还需要自己再封装一次。

  • 相关阅读:
    java基础教程-流IO(五)
    java基础教程-常用类(四)
    java基础教程-容器(三)
    java基础教程-异常处理(二)
    java基础教程-面向对象(一)
    javascript DOM编程艺术(笔记)
    二十二、动态规划
    二十一、所有结点对最短路径问题(弗洛伊德算法)
    二十、单源最短路径(迪杰斯特拉算法)
    十九、最小生成树(普里姆算法)
  • 原文地址:https://www.cnblogs.com/danxi/p/6636095.html
Copyright © 2011-2022 走看看