zoukankan      html  css  js  c++  java
  • 一个简单的线程池实现

    前段时间学习了线程方面的知识,看了关于线程池的教程,自己也试着实现一个。跟大家分享,同时也整理整理思路。

     

    对线程池的要求:

    1.用于处理大量短暂的任务。

    2.动态增加线程,直到达到最大允许的线程数量。

    3.动态销毁线程。

     

    线程池的实现类似于”消费者--生产者”模型:

    用一个队列存放任务(仓库,缓存)

    主线程添加任务(生产者生产任务)

    新建线程函数执行任务(消费者执行任务)

    由于任务队列是全部线程共享的,就涉及到同步问题。这里采用条件变量和互斥锁来实现。

    --------------------------------------------------------condition.h----------------------------------------------------------

     

    /*
    
    在此线程池中互斥锁和条件变量都是配套使用,编写此头文件使用比较方便
    
    在此线程池中保护共享数据的都是用这个头文件中的函数
    
    */
    
     
    
    #ifndef _CONDITION_H_
    
    #define _CONDITION_H_
    
     
    
    #include <pthread.h>
    
     
    
    //有互斥锁和条件变量的结构体condition_t
    
    typedef struct condition
    
    {
    
             pthread_cond_t pcond;
    
             pthread_mutex_t pmutex;
    
    } condition_t;
    
     
    
    int condition_init(condition_t *cond)
    
    {
    
             if(pthread_cond_init(&cond->pcond, NULL))
    
                       return 1;
    
             if(pthread_mutex_init(&cond->pmutex, NULL))
    
                       return 1;
    
             return 0; 
    
    };
    
     
    
    int condition_lock(condition_t *cond)
    
    {
    
             return pthread_mutex_lock(&cond->pmutex);
    
    }
    
     
    
    int condition_unlock(condition_t *cond)
    
    {
    
             return pthread_mutex_unlock(&cond->pmutex);
    
    }
    
     
    
    int condition_wait(condition_t *cond)
    
    {
    
             return pthread_cond_wait(&cond->pcond, &cond->pmutex);
    
    }
    
     
    
    int condition_timedwait(condition_t *cond, const struct timespec *abstime)
    
    {
    
            return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
    
    }
    
     
    
    int condition_signal(condition_t *cond)
    
    {
    
             return pthread_cond_signal(&cond->pcond);
    
    }
    
     
    
    int condition_broadcast(condition_t *cond)
    
    {
    
             return pthread_cond_broadcast(&cond->pcond);
    
    }
    
     
    
    int condition_destroy(condition_t *cond)
    
    {
    
             if(pthread_cond_destroy(&cond->pcond))
    
                       return 1;
    
             if(pthread_mutex_destroy(&cond->pmutex))
    
                       return 1;
    
             return 0;
    
    }
    
     
    
    #endif

     

     

     

    -------------------------------------------------------threadpool.h--------------------------------------------------------

     

    //线程池头文件
    
    #ifndef _THREADPOOL_H_
    
    #define _THREADPOOL_H_
    
     
    
    #include "condition.h"
    
    #include <stdio.h>
    
    #include <stdlib.h>
    
    #include <unistd.h>
    
    #include <sys/time.h>
    
    #include <time.h>
    
     
    
    //线程任务结构体(队列形式存储)
    
    typedef struct Task
    
    {
    
             void* (*run) (void*arg);
    
             void *arg;
    
             struct Task *next;
    
    } task_t;
    
     
    
    //线程池结构体
    
    typedef struct threadpool
    
    {
    
             condition_t ready;                    //互斥锁与条件变量
    
             task_t *first;                      //任务队列头
    
             task_t *last;                                    //任务队列尾
    
             int counter;                                     //任务总数
    
             int maxthread;                      //最大线程数
    
             int idle;                         //正在等待的线程数量
    
             int quit;                         //销毁线程池标志
    
    } threadpool_t;
    
     
    
    //初始化线程池
    
    void threadpool_init(threadpool_t *pool, int max_num)
    
    {
    
             condition_init(&pool->ready);
    
             pool->first = NULL;
    
             pool->last = NULL;
    
             pool->counter = 0;
    
             pool->maxthread = max_num;
    
             pool->idle = 0;
    
             pool->quit = 0;
    
    }
    
     
    
     
    
    //新建线程函数
    
    void *thread_routine(void *arg)
    
    {
    
             struct timespec abstime;
    
             int timeout;                                  //是否超过等待任务时间
    
             threadpool_t *pool = (threadpool_t*)arg;
    
     
    
             printf("0x%0x thread is starting\n", (int)pthread_self());
    
     
         //让线程不会因为结束任务立刻销毁
             for(;;)                                            
    
             {
    
                       timeout = 0;
    
                       condition_lock(&pool->ready);                               //对全局变量任务队列进行操作,要加锁
    
                       pool->idle ++;
    
               //等待任务队列中有任务
    
                       while(pool->first == NULL && pool->quit == 0)
    
                       {
    
                                printf("0x%0x thread is waiting\n", (int)pthread_self());
    
                               
    
                                clock_gettime(CLOCK_REALTIME, &abstime);
    
                                abstime.tv_sec += 2;                                   //设置等待超时时间
    
                                int status = condition_timedwait(&pool->ready, &abstime);
    
                                if(status == 110)                                      //timewait函数超时返回110(TIMEDOUT)
    
                                {
    
                                         printf("0x%0x thread is timeout\n", (int)pthread_self());
    
                                         timeout = 1;
    
                                         break;
    
                                }
    
                               
    
                       }
    
            
    
                       //正在等待的线程数量减一
    
                       pool->idle --;    
    
               //若任务队列中有任务
    
                       if(pool->first != NULL)
    
                       {
    
                                task_t *t = pool->first;
    
                                pool->first = pool->first->next;
    
                    //先解锁再执行任务函数,让其它线程可以对任务队列进行操作,提高效率
    
                                condition_unlock(&pool->ready);            
    
                                t->run(t->arg);
    
                                free(t);
    
                    //任务执行完毕,重新上锁
    
                                condition_lock(&pool->ready);
    
                       }
    
                      
    
                       //有销毁线程池命令 并且 线程队列为空
    
                       if(pool->first == NULL && pool->quit == 1)
    
                       {
    
                                pool->counter--;                                            //减掉一个任务
    
                                if(pool->counter == 0)                      //任务全部完成,向销毁函数发起通知
    
                                          condition_signal(&pool->ready);
    
                                condition_unlock(&pool->ready);                 //不要忘记解锁
    
                                break;
    
                       }
    
     
    
                       if(pool->first == NULL && timeout == 1)                  //等待任务超时
    
                       {
    
                                pool->counter--;                          //减掉一个任务
    
                                condition_unlock(&pool->ready);                 //同样不要忘记解锁
    
                                break;
    
                       }
    
                       condition_unlock(&pool->ready);
    
             }
    
             printf("0x%0x threadpool is exiting\n", (int)pthread_self());
    
             return NULL;
    
    }
    
     
    
    //向任务队列中添加任务
    
    void threadpool_add_task(threadpool_t *pool, void* (*run) (void*arg), void*arg)
    
    {
    
             //动态分配空间给新任务
    
             task_t *newtask = (task_t*)malloc(sizeof(task_t));
    
             newtask->run = run;
    
             newtask->arg = arg;
    
             newtask->next = NULL;
    
            
    
         //要对任务队列进行操作,上锁
    
             condition_lock(&pool->ready);
    
     
    
             //添加新任务到任务队列
    
             if(pool->first == NULL)
    
                       pool->first = newtask;
    
             else
    
                       pool->last->next = newtask;
    
             pool->last = newtask;
    
     
    
             //如果有线程在等待,则不用创建新线程,直接发起通知处理任务
    
             if(0 < pool->idle)
    
                       condition_signal(&pool->ready);
    
     
    
         //当前线程数不能超过线程数,用<不用<=,因为counter初始为0
    
             else if(pool->counter < pool->maxthread)                
    
             {
    
                       pthread_t thread;
    
                       pthread_create(&thread, NULL, thread_routine, (void*)pool);
    
                       pool->counter++;
    
             }
    
     
    
             condition_unlock(&pool->ready);
    
    }
    
     
    
    //销毁线程池
    
    void threadpool_destroy(threadpool_t *pool)
    
    {
    
             if(pool->quit)                                                      //若已经销毁,直接返回,避免销毁两次
    
                       return;
    
             condition_lock(&pool->ready);
    
             pool->quit = 1;
    
             if(pool->counter > 0)
    
             {
    
                       condition_broadcast(&pool->ready);                //广播关闭正在等待的线程
    
                       if(pool->counter > 0)                                    //若有正在进行的线程等待线程结束的通知
    
                                condition_wait(&pool->ready);
    
             }
    
             condition_unlock(&pool->ready);
    
             condition_destroy(&pool->ready);                            //销毁互斥锁和条件变量
    
    }
    
     
    
    #endif

     

     

     

     

     

     

     

     

    //测试代码(main函数):
    
    #include "threadpool.h"
    
    #include <stdio.h>
    
    #include <stdlib.h>
    
    #include <unistd.h>
    
    #include <errno.h>
    
     
    
    void* mytask(void *arg)
    
    {
    
             printf("0x%0x thread run the task %d\n", (int)pthread_self(), *(int*)arg);
    
             free(arg);
    
             sleep(1);
    
             return NULL;
    
    }
    
     
    
    int main(int argc, char **argv)
    
    {
    
             threadpool_t pool;
    
             int i;
    
             threadpool_init(&pool, 4);
    
             for (i = 0; i < 10; ++i)
    
             {
    
                       int *arg = (int *)malloc(sizeof(int));
    
                       *arg = i;
    
                       threadpool_add_task(&pool, mytask, (void *)arg);
    
             }
    
    //      sleep(15);
    
             threadpool_destroy(&pool);
    
             return 0;
    
    }

    代码执行结果如下:

    [kami@localhost 线程池]$ ./main

    0x335fe700 thread is starting

    0x31dfb700 thread is starting

    0x31dfb700 thread run the task 1

    0x335fe700 thread run the task 0

    0x32dfd700 thread is starting

    0x32dfd700 thread run the task 2

    0x325fc700 thread is starting

    0x325fc700 thread run the task 3

    0x31dfb700 thread run the task 4

    0x335fe700 thread run the task 5

    0x32dfd700 thread run the task 6

    0x325fc700 thread run the task 7

    0x335fe700 thread run the task 9

    0x325fc700 threadpool is exiting

    0x31dfb700 thread run the task 8

    0x32dfd700 threadpool is exiting

    0x335fe700 threadpool is exiting

    0x31dfb700 threadpool is exiting

    可以看到一共处理了0~9十个任务;

    最大线程数量为4(0x335fe700,0x31dfb700, 0x32dfd700, 0x325fc700 );

    等待每个线程都退出了才结束进程;

    第一次写博客,希望能够在这里学到更多,也分享一些自己学习的心得体会,共勉互励。

  • 相关阅读:
    UML学习——用例图(二)
    servlet相关生命周期(二)
    UML 学习——UML概述(一)
    servlet概述(一)
    java 核心编程——线程之线程控制(五)
    java 核心编程——线程之线程池(ExecutorService)(四)
    java 核心编程——线程之线程组(四)
    java 核心编程——线程之定时器(TimerTask)(三)
    java 核心编程——线程之线程的开发方法(二)
    java 核心编程——线程之线程的基本概念(一)
  • 原文地址:https://www.cnblogs.com/kamicoder/p/6390046.html
Copyright © 2011-2022 走看看