zoukankan      html  css  js  c++  java
  • linux c++线程池开发demo

    // condition.h

    #ifndef _CONDITION_H_
    #define _CONDITION_H_
    
    #include<pthread.h>
    typedef struct condition{
        pthread_mutex_t pmutex;
        pthread_cond_t pcond;
        }condition_t;
    int condition_init(condition_t * cond);   //初始化
    int condition_lock(condition_t * cond); //加锁
    int condition_unlock(condition_t * cond); //解锁
    int condition_wait(condition_t* cond);  //等待事件的调用
    int condition_timedwait(condition_t * cond , const struct timespec * abstime); // 超时处理
    int condition_signal(condition_t * cond);        //向等待线程发起通知
    int condition_broadcast(condition_t * cond);    //向等待线程发起广播 
    int condition_destroy(condition_t *cond);       //销毁条件变量
    
    #endif
    

    // condition.cpp

    #include <iostream>
    #include "condition.h"
    
    int condition_init(condition_t * cond){
        int status;
        if((itatus = pthread_mutex_init(&cond->pmutex, NULL)))
        return status;
        if((status = pthread_cond_init(&cond->pcond ,NULL)))
        return status;
    
        return 0;
        }
    
    int condition_lock(condition_t * cond){
        return pthread_mutex_lock(&cond->pmutex);   
        }
    
    int condition_nulock(condition_t * cond){
        return pthread_mutex_nulock(&cond->pmutex);
        }
    
    int condition_wait(condition_t * cond){
        return pthread_cond_wait(&cond->pcond , &cond->pmutex);
        }
    
    int condition_timedwait(condition_t * cond){
        return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
        }               
    
    int  condition_signal(condition_t * cond){
        return pthread_cond_signal(&cond->pcond);
        } 
    int condition_broadcast(condition_t *cond){
        return pthread_cond_broadcast(*cond->pcond);
        }
    int condition_destroy(condition_t * cond){
        int status;
        if((status = pthread_mutex_destroy(&cond->pmutex)))
        return status;
        if((status = pthread_mutex_destroy(&cond->pcond)))
        return status;
    
        return 0;
        }

    //pthreadpool.h

    #ifndef _PTHREADPOOL_H_
    #define _PTHREADPOOL_H_
    
    #include "condition.h"
    
    //任务结构体,将任务放入队列由线程池中的线程来执行
    typedef struct task {
        void *(*run)(void* arg); // 任务回调函数
        void *arg;              // 回调函数参数       
        struct task *next ;
        }task_t;
    
    //线程池结构体
    typedef struct threadpool{
        condition_t ready ; //任务准备或者线程池销毁通知
        task_t *first ;     
        task_t *last;
        int counter ;       //当前线程数
        int idle ;          // 等待任务的线程数 
        int max_threads;    // 最大线程数    
        int quit ;          // 置1 销毁线程池
        }threadpool_t;
    
    //初始化线程池 
    void threadpool_init(threadpool_t * pool , int threads);
    //往线程池中添加任务
    void threadpool_add_task(threadpool_t* pool , void* (*run)(void *arg), void * arg);
    //销毁线程池
    void threadpool_destroy(threadpool_t * pool );
    
    #endif
    

    //pthreadpool.cpp

    #include <iostream>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    #include <time.h>
    #include <pthread.h>
    #include "threadpool.h"
    
    using namespace std;
    
    void* thread_routine(void* arg){
        struct timespec abstime;
        int timeout;
        cout << "线程ID"<<(int)pthread_self()<<"启动"<< endl;
        threadpool_t *pool = (threadpool_t *)arg;
        while(1){
            timeout = 0;
            condition_lock(&pool->ready);
            pool ->idle++;
            //等待队列有任务到来 或者 销毁消息
            while((pool->first == NULL)&& (!(pool->quit))){
                cout << "线程ID"<<(int)pthread_self()<<"等待中"<< endl;
                //condition_wait(&pool->ready);
                clock_gettime(CLOCK_REALTIME , &abstime);
                abstime.tv_sec += 2;
                int status = condition_timedwait(&pool->ready , &abstime);
                if(status == ETIMEDOUT){
                    cout << "线程ID"<<(int)pthread_self()<<"等待超时"<< endl;
                    timeout = 1;
                    break ;
                    }
                }
            //等待到条件 ,处于工作状态 
            pool->idle--;
            if(pool->first != NULL ){
                task_t *t = pool->first;
                pool->first = t->next;
                //执行任务需要一定时间 ,所以需要先解锁 
                condition_nulock(&pool->ready);
                t->run(t->arg);       //线程函数消费开始执行 
                delete t;            //删除任务结构体  
                condition_lock(&pool->ready);
                }
                //等待到销毁
            if((pool->quit) && (pool->first == NULL)){
                pool->counter--;
                if(pool->counter == 0){
                    condition_signal(&pool->ready);
                    }
                condition_nulock(&pool->ready);
                break ;
                }
            if(timeout && (pool->first == NULL)){
                pool->counter--;
                condition_nulock(&pool->ready);
                break ;
                }       
            condition_unlock(&pool->ready);
            }
            cout << "线程ID"<<(int)pthread_self()<< "销毁"<< endl;
            return (void*) 0;   
        }
    
    
    
    // 线程池的初始化
    void pthreadpool_init(threadpool_t * pool , int threads){
        condition_init(& pool ->ready);
        pool->first = NULL;
        pool->last = NULL ;
        pool->counter = 0;
        pool->idle = 0;
        pool->max_threads = threads;
        pool->quit = 0;
        }
    
    //添加任务  
    void pthreadpool_add_task(threadpool_t * pool , void*(*run)(void* arg), void* arg){
        //void *(*run)(void* arg); // 任务回调函数
        //void *arg;                // 回调函数参数       
        //struct task *next ;
    
        task_t *newtask = new struct task ;
        newtask-> run = run ;
        newtask-> arg = arg;
        newtask-> next= NULL ;
    
        if(pool->first == NULL ){
            pool->first = newtask;
            pool->last = newtask;
            }
        else{
            pool->last-next = newtask;
            pool->last = newtask->next;     //添加后尾指针位置发生改变  后移一位
             }
            //如果有等待线程就唤醒 ,执行其中一
        if(pool->idle >0 ){
            condition_signal(&pool->ready);
        }else if(pool->counter < pool->max_threads){
            //没有线程等待 ,并且当前线程数不操过最大线程 ,创建新线程
            pthread_t tid;
            pthread_create(&tid, NULL ,thread_routine , pool);
            pool->counter++;    
            }
        }
    
    //销毁线程池
    void threadpool_destroy(threadpool_t * pool){
        if(pool->quit)
            return ;
        condition_lock(&pool->ready);
        pool->quit = 1;
        if(pool->counter > 0){
            if(pool->idle >0){
                condition_broadcast(&pool->ready);   //向等待线程发起广播
                while(pool->counter >0){
                    condition_wait(&pool->ready);    
                    }
                }
            }
        condition_unlock(&pool->ready);
        condition_destroy(&pool->ready);            //销毁条件变量             
    
        }
    

    //main.cpp

    #include <iostream>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    #include <time.h>
    #include <pthread.h>
    #include "threadpool.h"
    
    using namespace std;
    
    
    void * mytask(void *arg){
        cout << "线程ID="<<(int)pthread_self()<<"正在执行"<< *(int*)arg<<endl;
        delete (int*)arg;
        return  (void* ) 0; 
        }
    
    
    int main(void){
        threadpool_t pool;
        threadpool_init(&pool , 3);
    
        size_t i;
        for(i=0 ; i<10 ; i++){
            int *arg = new int ;
            *arg = i;
            threadpool_add_task( &pool , mytask , arg );   //添加任务
            }
        sleep(6);
        threadpool_destroy(&pool);            //销毁线程池
        return 0;
        }
    
  • 相关阅读:
    Swing 添加Esc快捷键退出程序
    DefaultTableCellRenderer 自定义
    项目清理和删除svn信息(转)
    时间转换工具类
    Java Swing 日期控件(转载)
    Eureka原理
    SpringCloud之Eureka注册中心集群篇
    spring boot及spring cloud介绍
    spring cloud 服务注册/发现/提供/调用 demo
    eclipse构建maven+scala+spark工程
  • 原文地址:https://www.cnblogs.com/Sico2Sico/p/5384284.html
Copyright © 2011-2022 走看看