zoukankan      html  css  js  c++  java
  • linux线程池分析

    . 线程池学习文件

    pool_test/  -> 线程池函数接口实现源码,简单实例。

    系统编程项目接口设计说明书.doc  -> 详细说明了线程池各个函数的头文件/原型/参数/返回值..。

    线程池模型.jpg  -> 帮助大家理解线程池原理。

    . 学习线程池实现过程?

    1. 什么是线程池?

    线程池就是多个线程组合起来的一个集合,当有任务时,线程就会处理任务,当没有任务时,线程休息。

    2. 分析线程池源码

    thread_pool.c  -> 线程池函数接口源码

    thread_pool.h  -> 函数接口声明/结构体声明/头文件..

    ===========================================================

    thread_pool.h

     

    #define MAX_WAITING_TASKS  1000

       -> 最大的任务等待个数

    #define MAX_ACTIVE_THREADS 20     -> 最大线程个数

     

    0)任务节点结构体

    struct task

    {

          

           void *(*do_task)(void *arg);  -> 任务函数

           void *arg;

     

                        -> 任务函数的参数

           struct task *next;         -> 指向下一个任务节点的指针

    };

     

    1)线程池模型

    typedef struct thread_pool

    {

          

           pthread_mutex_t lock;

      -> 互斥锁

           pthread_cond_t  cond;

      -> 条件变量   

           bool shutdown;

                  -> 线程池关闭标识符号  true->关闭   false->未关闭

           struct task *task_list;

    -> 任务队列的头文件

           pthread_t *tids;

           -> 存放线程TID号空间地址     

           unsigned max_waiting_tasks;

     -> 最大的等待任务的个数     

           unsigned waiting_tasks;  -> 当前等待任务的个数

          

           unsigned active_threads;

      -> 当前线程池中线程的个数

    }thread_pool;

    2)初始化线程池函数模型

    bool init_pool(thread_pool *pool, unsigned int threads_number);

    3)线程处理函数

    void *routine(void *arg)

    4)添加任务函数

    bool add_task(thread_pool *pool,void *(*do_task)(void *arg), void *arg)

    ===========================================================

    thread_pool.c

    1)初始化线程池函数源码

     

    2)线程处理函数源码

     

    3)添加任务函数源码

    源码:

    头文件:

    #ifndef _THREAD_POOL_H_
    #define _THREAD_POOL_H_
    
    #include <stdio.h>
    #include <stdbool.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <string.h>
    #include <strings.h>
    
    #include <errno.h>
    #include <pthread.h>
    
    #define MAX_WAITING_TASKS    1000
    #define MAX_ACTIVE_THREADS    20
    
    struct task
    {
        void *(*do_task)(void *arg);
        void *arg;
    
        struct task *next;
    };
    
    typedef struct thread_pool
    {
        pthread_mutex_t lock;
        pthread_cond_t  cond;
        bool shutdown;
        struct task *task_list;
        pthread_t *tids;
        unsigned max_waiting_tasks;
        unsigned waiting_tasks;
        unsigned active_threads;
    }thread_pool;
    
    
    bool init_pool(thread_pool *pool, unsigned int threads_number);
    bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
    int  add_thread(thread_pool *pool, unsigned int additional_threads_number);
    int  remove_thread(thread_pool *pool, unsigned int removing_threads_number);
    bool destroy_pool(thread_pool *pool);
    
    void *routine(void *arg);
    
    
    #endif

    功能函数:

    #include "thread_pool.h"
    
    void handler(void *arg)
    {
        printf("[%u] is ended.
    ",
            (unsigned)pthread_self());
    
        //解锁!
        pthread_mutex_unlock((pthread_mutex_t *)arg);
    }
    
    void *routine(void *arg)
    {
        //接住线程池的地址
        thread_pool *pool = (thread_pool *)arg;
        struct task *p;
    
        while(1)
        {
            //取消例程函数,将来线程上锁了,如果收到取消请求,那么先解锁,再退出
            pthread_cleanup_push(handler, (void *)&pool->lock);
            
            //任务队列是属于临界资源。
            //访问任务队列之前都必须上锁。
            pthread_mutex_lock(&pool->lock);
        
            //如果当前线程池未被关闭并且线程池中没有需要处理的任务时:
            while(pool->waiting_tasks == 0 && !pool->shutdown)
            {
                //那么就进入条件变量中等待!
                pthread_cond_wait(&pool->cond, &pool->lock);
            }
            
            //如果线程等待任务为0,并且线程池已经关闭了。
            if(pool->waiting_tasks == 0 && pool->shutdown == true)
            {    
                //解锁
                pthread_mutex_unlock(&pool->lock);    
                
                //走人
                pthread_exit(NULL); 
            }
    
            //有任务做,代表肯定不是空链表,拿任务p
            p = pool->task_list->next;
            pool->task_list->next = p->next;
            
            //当前等待的任务的个数-1
            pool->waiting_tasks--;
    
            //解锁
            pthread_mutex_unlock(&pool->lock);
            
            //删除线程取消例程函数
            pthread_cleanup_pop(0);
            
            //设置线程不可以响应取消。
            pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
            
            //执行任务节点中函数过程中,不希望被别人取消掉。
            (p->do_task)(p->arg);
            
            //设置为可以响应取消
            pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    
            //释放任务节点p的内存空间
            free(p);
        }
    
        pthread_exit(NULL);
    }
    
    bool init_pool(thread_pool *pool, unsigned int threads_number)
    {
        //1. 初始化互斥锁
        pthread_mutex_init(&pool->lock, NULL);
        
        //2. 初始化条件变量
        pthread_cond_init(&pool->cond, NULL);
    
        //3. 线程池关闭标志为未关闭
        pool->shutdown = false;
        
        //4. 为任务队列头节点申请空间
        pool->task_list = malloc(sizeof(struct task));
        
        //5. 为线程TID号申请空间
        pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
    
        //错误判断
        if(pool->task_list == NULL || pool->tids == NULL)
        {
            perror("allocate memory error");
            return false;
        }
    
        //为任务队列头节点的指针域赋值NULL
        pool->task_list->next = NULL;
        
        //设置最大等待任务个数为1000
        pool->max_waiting_tasks = MAX_WAITING_TASKS;
        
        //设置当前等待任务的个数为0
        pool->waiting_tasks = 0;
        
        //设置当前线程池线程的个数
        pool->active_threads = threads_number;
    
        int i;
        //创建线程池中的子线程
        for(i=0; i<pool->active_threads; i++)
        {
            if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0)
            {
                perror("create threads error");
                return false;
            }
        }
    
        //初始化成功
        return true;
    }
    
    bool add_task(thread_pool *pool,void *(*do_task)(void *arg), void *arg)
    {
        //为新节点申请内存空间
        struct task *new_task = malloc(sizeof(struct task));
        if(new_task == NULL)
        {
            perror("allocate memory error");
            return false;
        }
        
        //为新节点的数据域赋值
        new_task->do_task = do_task;  //函数
        new_task->arg = arg; //函数的参数
        
        //为新节点的指针域赋值
        new_task->next = NULL;  
    
        //访问任务队列前,先上锁!
        pthread_mutex_lock(&pool->lock);
        
        //如果当前等待任务个数>=1000,则添加任务失败!
        if(pool->waiting_tasks >= MAX_WAITING_TASKS)
        {
            //解锁
            pthread_mutex_unlock(&pool->lock);
    
            //输出错误信息
            fprintf(stderr, "too many tasks.
    ");
            
            //释放刚刚初始化过的新节点
            free(new_task);
    
            return false;
        }
        
        //寻找任务队列的最后一个节点
        struct task *tmp = pool->task_list;
        while(tmp->next != NULL)
            tmp = tmp->next;
        
        //tmp->next = NULL;
    
        //把新节点尾插进去任务队列中
        tmp->next = new_task;
        
        //当前最大的等待任务的个数+1
        pool->waiting_tasks++;
    
        //解锁
        pthread_mutex_unlock(&pool->lock);
    
        //随机唤醒条件变量中其中一个线程起来就可以了。
        pthread_cond_signal(&pool->cond);
        
        return true;
    }
    
    int add_thread(thread_pool *pool, unsigned additional_threads)
    {
        //如果新增0个线程
        if(additional_threads == 0)
            return 0; //直接返回0
    
        //添加后线程总数
        unsigned total_threads = 
                pool->active_threads + additional_threads;
                            
        int i, actual_increment = 0;
        
        //创建线程
        for(i = pool->active_threads;  
              i < total_threads && i < MAX_ACTIVE_THREADS;  
              i++) 
           {
            if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0)
            {
                perror("add threads error");
                if(actual_increment == 0) 
                    return -1;
    
                break;
            }
            actual_increment++;  //真正创建的线程个数
        }
    
        //当前活跃的线程数 = 原来活跃的线程数 + 新实际创建的线程数
        pool->active_threads += actual_increment;
        
        return actual_increment;
    }
    
         
    int remove_thread(thread_pool *pool, unsigned int removing_threads)
    {
        //如果需要删除0条线程
        if(removing_threads == 0)
            return pool->active_threads; //当前线程池活跃的线程个数
    
        //剩余的线程数 = 当前活跃的线程数 - 需要删除的线程。
        int remaining_threads = pool->active_threads - removing_threads;
    
        //线程池中至少有1条线程
        remaining_threads = remaining_threads > 0 ? remaining_threads : 1;
    
        int i;  
        for(i=pool->active_threads-1; i>remaining_threads-1; i--)
        {    
            errno = pthread_cancel(pool->tids[i]);
            if(errno != 0)
                break;
        }
    
        //如果取消失败,则函数返回-1
        if(i == pool->active_threads-1)
            return -1;
        else
        {
            //计算当前剩余实际的个数
            pool->active_threads = i+1;
            return i+1; //返回当前线程剩余的个数
        }
    }
    
    
    bool destroy_pool(thread_pool *pool)
    {
        pool->shutdown = true; //当前线程池标志是关闭状态
        pthread_cond_broadcast(&pool->cond);
        int i;
        for(i=0; i<pool->active_threads; i++)
        {
            errno = pthread_join(pool->tids[i], NULL);
    
            if(errno != 0)
            {
                printf("join tids[%d] error: %s
    ",
                        i, strerror(errno));
            }
        
            else
                printf("[%u] is joined
    ", (unsigned)pool->tids[i]);
            
        }
    
    
        free(pool->task_list);
        free(pool->tids);
        free(pool);
    
        return true;
    }

    主函数:

    #include "thread_pool.h"
    
    void *mytask(void *arg) //线程的任务
    {
        int n = (int)arg;
    
        //工作任务:余数是多少,就睡多少秒,睡完,任务就算完成
        printf("[%u][%s] ==> job will be done in %d sec...
    ",
            (unsigned)pthread_self(), __FUNCTION__, n);
    
        sleep(n);
    
        printf("[%u][%s] ==> job done!
    ",
            (unsigned)pthread_self(), __FUNCTION__);
    
        return NULL;
    }
    
    void *count_time(void *arg)
    {
        int i = 0;
        while(1)
        {
            sleep(1);
            printf("sec: %d
    ", ++i);
        }
    }
    
    int main(void)
    {
        // 本线程用来显示当前流逝的秒数
        // 跟程序逻辑无关
        pthread_t a;
        pthread_create(&a, NULL, count_time, NULL);
    
        // 1, initialize the pool
        thread_pool *pool = malloc(sizeof(thread_pool));
        init_pool(pool, 2);
        //2个线程都在条件变量中睡眠
    
        // 2, throw tasks
        printf("throwing 3 tasks...
    ");
        add_task(pool, mytask, (void *)(rand()%10));
        add_task(pool, mytask, (void *)(rand()%10));
        add_task(pool, mytask, (void *)(rand()%10));
    
        // 3, check active threads number
        printf("current thread number: %d
    ",
                remove_thread(pool, 0));//2
        sleep(9);
    
        // 4, throw tasks
        printf("throwing another 6 tasks...
    ");
        add_task(pool, mytask, (void *)(rand()%10));
        add_task(pool, mytask, (void *)(rand()%10));
        add_task(pool, mytask, (void *)(rand()%10));
        add_task(pool, mytask, (void *)(rand()%10));
        add_task(pool, mytask, (void *)(rand()%10));
        add_task(pool, mytask, (void *)(rand()%10));
        
        // 5, add threads
        add_thread(pool, 2);
    
        sleep(5);
    
        // 6, remove threads
        printf("remove 3 threads from the pool, "
               "current thread number: %d
    ",
                remove_thread(pool, 3));
    
        // 7, destroy the pool
        destroy_pool(pool);
        return 0;
    }
  • 相关阅读:
    关于求 p_i != i and p_i != i+1 的方案数的思考过程
    poj 3041 Asteroids 二分图最小覆盖点
    poj 1325 Machine Schedule 最小顶点覆盖
    poj 1011 Sticks 减枝搜索
    poj 1469 COURSES 最大匹配
    zoj 1516 Uncle Tom's Inherited Land 最大独立边集合(最大匹配)
    Path Cover (路径覆盖)
    hdu 3530 SubSequence TwoPoint单调队列维护最值
    zoj 1654 Place the Rebots 最大独立集转换成二分图最大独立边(最大匹配)
    poj 1466 Girls and Boys 二分图最大独立子集
  • 原文地址:https://www.cnblogs.com/zjlbk/p/11359578.html
Copyright © 2011-2022 走看看