zoukankan      html  css  js  c++  java
  • linux下简单线程池实现

    /* 头文件:threadpool.h*/

    #ifndef _TP_H_INCLUDED_

    #define _TP_H_INCLUDED_

    #include <stdio.h>

    #include <stdlib.h>

    #include <unistd.h>

    #include <sys/types.h>

    #include <pthread.h>

    #include <assert.h>

    typedef struct worker {    

      void * (*process) (void *arg);    

      void *arg;    

      struct worker *next;  

    } threadpool_item_t;

    typedef struct {   

         pthread_mutex_t queue_lock;    

       pthread_cond_t queue_ready;    

         threadpool_item_t *queue_head;      

           int shutdown;     /*是否销毁线程池*/   

           pthread_t *threadid;

             int max_thread_num;/*线程池中允许的活动线程数目*/  

         int cur_queue_size;   /*当前等待队列的任务数目*/   

    } threadpool_t;

    void pool_init (int max_thread_num);

    int  pool_destroy ();

    int  pool_add_worker (void * (*process) (void *arg), void *arg);

    static void * thread_routine (void *arg);

    #endif

    /*具体实现:threadpool.c*/

    #include "threadpool.h"

    static threadpool_t *pool = NULL;

    /*初始化线程池*/

    void pool_init (int max_thread_num) {

          pool = (threadpool_t *) malloc (sizeof (threadpool_t));     

        pthread_mutex_init (&(pool->queue_lock), NULL);    

      pthread_cond_init (&(pool->queue_ready), NULL);      

      pool->queue_head = NULL;      

      pool->max_thread_num = max_thread_num;    

      pool->cur_queue_size = 0;      

      pool->shutdown = 0;      

      pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));

       int i;    

      for (i = 0; i < max_thread_num; i++)    {       

          pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL);

        }

    }

    /*向线程池中加入任务*/

    int pool_add_worker (void *(*process) (void *arg), void *arg) {

          /*构造一个新任务*/   

        threadpool_item_t *newworker = (threadpool_item_t *) malloc (sizeof (threadpool_item_t));    

        newworker->process = process;   

        newworker->arg = arg;    

       newworker->next = NULL;

         pthread_mutex_lock (&(pool->queue_lock));

          /*将任务加入到等待队列中*/    

      threadpool_item_t *member = pool->queue_head;   

       if (member != NULL)   {     

          while (member->next != NULL)    member = member->next;

              member->next = newworker;    

     } else{        

        pool->queue_head = newworker;  

       }     

       assert (pool->queue_head != NULL);     

       pool->cur_queue_size++;    

      pthread_mutex_unlock (&(pool->queue_lock));  

        /*唤醒一个等待线程*/  

       pthread_cond_signal (&(pool->queue_ready));

        return 0;

    }

    /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出*/

    int pool_destroy () {   

       if (pool->shutdown) return -1;/*防止两次调用*/

        pool->shutdown = 1;      

       /*唤醒所有等待线程,线程池要销毁了*/    

       pthread_cond_broadcast (&(pool->queue_ready));     

      /*阻塞等待线程退出,否则就成僵尸了*/    

      int i;   

      for (i = 0; i < pool->max_thread_num; i++)    { 

         pthread_join (pool->threadid[i], NULL);   

      } 

    free (pool->threadid);      

       /*销毁等待队列*/  

       threadpool_item_t *head = NULL;   

      while (pool->queue_head != NULL)   

      {       

        head = pool->queue_head;      

         pool->queue_head = pool->queue_head->next;    

          free (head);   

      }   

      /*销毁条件变量和互斥量*/    

       pthread_mutex_destroy(&(pool->queue_lock));    

       pthread_cond_destroy(&(pool->queue_ready));      

       free (pool);   

       pool=NULL;  

       return 0;

    }

    /*线程主函数*/

    static void * thread_routine (void *arg) {

        printf ("starting thread %lu\n", pthread_self ());    

        while (1)     {  

             pthread_mutex_lock (&(pool->queue_lock));    

           /*如果等待队列为0并且不销毁线程池,则处于阻塞状态;*/       

         while (pool->cur_queue_size == 0 && !pool->shutdown)    

           {          

           printf ("thread %lu is waiting\n", pthread_self ());         

            pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));      

           }     

        /*线程池要销毁了*/       

        if (pool->shutdown)  {           

          /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/           

          pthread_mutex_unlock (&(pool->queue_lock));            

        printf ("thread %lu will exit\n", pthread_self ());            

        pthread_exit (NULL);      

       }       

         printf ("thread %lu is starting to work\n", pthread_self ());      

         assert (pool->cur_queue_size != 0);     

         assert (pool->queue_head != NULL);           

          /*队列长度减去1,并取出链表中的头元素*/        

      pool->cur_queue_size--;       

       threadpool_item_t *worker = pool->queue_head;      

         pool->queue_head = worker->next;        

         pthread_mutex_unlock (&(pool->queue_lock)); //解锁       

        /*调用回调函数,执行任务*/      

         (*(worker->process)) (worker->arg);       

         free (worker);     

        worker = NULL;

        }

    }

    /* 下面是测试代码 */

    void * myprocess (void *arg)

    {

       printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);  

      sleep (1);/*休息一秒,延长任务的执行时间*/  

      return NULL;

    }

    int main (int argc, char **argv)

    {   

      pool_init (3);/*线程池中最多三个活动线程*/      

      /*连续向池中投入10个任务*/    

     int *workingnum = (int *) malloc (sizeof (int) * 10);

     int i;   

      for (i = 0; i < 10; i++)   

      {       

          workingnum[i] = i;

           pool_add_worker (myprocess, &workingnum[i]);   

      }    

       /*等待所有任务完成*/     sleep (5);   

      /*销毁线程池*/     pool_destroy ();     

      free(workingnum);  

    workingnum = NULL;   

      return 0;

    }

     编译,运行:

    $ gcc -std=c99 -o threadpool threadpool.h threadpool.c -lpthread

    $ ./threadpool
    starting thread 1115277632
    thread 1115277632 is starting to work
    threadid is 1115277632, working on task 0
    starting thread 1125767488
    thread 1125767488 is starting to work
    threadid is 1125767488, working on task 1
    starting thread 1136257344
    thread 1136257344 is starting to work
    threadid is 1136257344, working on task 2
    thread 1115277632 is starting to work
    threadid is 1115277632, working on task 3
    thread 1125767488 is starting to work
    threadid is 1125767488, working on task 4
    thread 1136257344 is starting to work
    threadid is 1136257344, working on task 5
    thread 1115277632 is starting to work
    threadid is 1115277632, working on task 6
    thread 1125767488 is starting to work
    threadid is 1125767488, working on task 7
    thread 1136257344 is starting to work
    threadid is 1136257344, working on task 8
    thread 1115277632 is starting to work
    threadid is 1115277632, working on task 9
    thread 1125767488 is waiting
    thread 1136257344 is waiting
    thread 1115277632 is waiting
    thread 1125767488 will exit
    thread 1136257344 will exit
    thread 1115277632 will exit

  • 相关阅读:
    Rsa加密类
    Des加密类
    AES对称加密解密类
    用字符串生成二维码
    ViedoUtil获取视频的缩略图
    UmUtils得到友盟的渠道号
    UiUtils
    StringUtils
    ShockUtil振动工具类
    SHA加密
  • 原文地址:https://www.cnblogs.com/ylh1223/p/2738937.html
Copyright © 2011-2022 走看看