zoukankan      html  css  js  c++  java
  • nginx线程池源码解析

      周末看了nginx线程池部分的代码,顺手照抄了一遍,写成了自己的版本。实现上某些地方还是有差异的,不过基本结构全部摘抄。

      在这里分享一下。如果你看懂了我的版本,也就证明你看懂了nginx的线程池。

      本文只列出了关键数据结构和API,重在理解nginx线程池设计思路。完整代码在最后的链接里。

      1.任务节点

    typedef void (*CB_FUN)(void *);
    
    //任务结构体
    typedef struct task
    {
        void        *argv; //任务函数的参数(任务执行结束前,要保证参数地址有效)
        CB_FUN        handler; //任务函数(返回值必须为0   非0值用作增加线程,和销毁线程池)
        struct task *next; //任务链指针
    }zoey_task_t;

      handler为函数指针,是实际的任务函数,argv为该函数的参数,next指向下一个任务。

      2.任务队列

    typedef struct task_queue
    {
        zoey_task_t *head;  //队列头
        zoey_task_t **tail;    //队列尾
        unsigned int maxtasknum; //最大任务限制
        unsigned int curtasknum; //当前任务数
    }zoey_task_queue_t;

      head为任务队列头指针,tail为任务队列尾指针,maxtasknum为队列最大任务数限制,curtasknum为队列当前任务数。

      3.线程池

    typedef struct threadpool
    {
        pthread_mutex_t    mutex;  //互斥锁
        pthread_cond_t     cond;    //条件锁
        zoey_task_queue_t       tasks;//任务队列
    
        unsigned int       threadnum; //线程数
        unsigned int       thread_stack_size; //线程堆栈大小
    
    }zoey_threadpool_t;

      mutex为互斥锁 cond为条件锁。mutex和cond共同保证线程池任务的互斥领取或者添加。

      tasks指向任务队列。

      threadnum为线程池的线程数

      thread_stack_size为线程堆栈大小 

      4.启动配置

    //配置参数
    typedef struct threadpool_conf
    {
        unsigned int threadnum;    //线程数
        unsigned int thread_stack_size;//线程堆栈大小
        unsigned int maxtasknum;//最大任务限制
    }zoey_threadpool_conf_t;

      启动配置结构体是初始化线程池时的一些参数。

      

      5.初始化线程池

      首先检查参数是否合法,然后初始化mutex,cond,key(pthread_key_t)。key用来读写线程全局变量,此全局变量控制线程是否退出。

      最后创建线程。

    zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf)
    {
        zoey_threadpool_t *pool = NULL;
        int error_flag_mutex = 0;
        int error_flag_cond = 0;
        pthread_attr_t attr;
        do{
            if (z_conf_check(conf) == -1){ //检查参数是否合法
                break;
            }
    
            pool = (zoey_threadpool_t *)malloc(sizeof(zoey_threadpool_t));//申请线程池句柄
            if (pool == NULL){
                break;
            }
    
            //初始化线程池基本参数
            pool->threadnum = conf->threadnum;
            pool->thread_stack_size = conf->thread_stack_size;
            pool->tasks.maxtasknum = conf->maxtasknum;
            pool->tasks.curtasknum = 0;
    
            z_task_queue_init(&pool->tasks);
        
            if (z_thread_key_create() != 0){//创建一个pthread_key_t,用以访问线程全局变量。
                free(pool);
                break;
            }
            if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥锁
                z_thread_key_destroy();
                free(pool);
                break;
            }
    
            if (z_thread_cond_create(&pool->cond) != 0){  //初始化条件锁
                z_thread_key_destroy();
                z_thread_mutex_destroy(&pool->mutex);
                free(pool);
                break;
            }
    
            if (z_threadpool_create(pool) != 0){       //创建线程池
                z_thread_key_destroy();
                z_thread_mutex_destroy(&pool->mutex);
                z_thread_cond_destroy(&pool->cond);
                free(pool);
                break;
            }
            return pool;
        }while(0);
    
        return NULL;
    }

     

     6.添加任务

      首先申请一个任务节点,实例化后将节点加入任务队列,并将当前任务队列数++并通知其他进程有新任务。整个过程加锁。

    int zoey_threadpool_add_task(zoey_threadpool_t *pool, CB_FUN handler, void* argv)
    {
        zoey_task_t *task = NULL;
        //申请一个任务节点并赋值
        task = (zoey_task_t *)malloc(sizeof(zoey_task_t));
        if (task == NULL){
            return -1;
        }
        task->handler = handler;
        task->argv = argv;
        task->next = NULL;
        if (pthread_mutex_lock(&pool->mutex) != 0){ //加锁
            free(task);
            return -1;
        }
        do{
    
            if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){//判断工作队列中的任务数是否达到限制
                break;
            }
    
            //将任务节点尾插到任务队列
            *(pool->tasks.tail) = task;
            pool->tasks.tail = &task->next;
            pool->tasks.curtasknum++;
    
            //通知阻塞的线程
            if (pthread_cond_signal(&pool->cond) != 0){
                break;
            }
            //解锁
            pthread_mutex_unlock(&pool->mutex);
            return 0;
    
        }while(0);
        pthread_mutex_unlock(&pool->mutex);
        free(task);
        return -1;
    
    }

     

     7.销毁线程池

      销毁线程池其实也是向任务队列添加任务,只不过添加的任务是让线程退出。z_threadpool_exit_cb函数会将lock置0后退出线程,lock为0表示此线程

      已经退出,接着退出下一个线程。退出完线程释放所有资源。

    void zoey_threadpool_destroy(zoey_threadpool_t *pool)
    {
        unsigned int n = 0;
        volatile unsigned int  lock;
    
        //z_threadpool_exit_cb函数会使对应线程退出
        for (; n < pool->threadnum; n++){
            lock = 1;
            if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){
                return;
            }
            while (lock){
                usleep(1);
            }
        }
        z_thread_mutex_destroy(&pool->mutex);
        z_thread_cond_destroy(&pool->cond);
        z_thread_key_destroy();
        free(pool);
    }

     

     8.增加一个线程

      很简单,再生成一个线程以及线程数++即可。加锁。

    int zoey_thread_add(zoey_threadpool_t *pool)
    {
        int ret = 0;
        if (pthread_mutex_lock(&pool->mutex) != 0){
            return -1;
        }
        ret = z_thread_add(pool);
        pthread_mutex_unlock(&pool->mutex);
        return ret;
    }

     

     9.改变任务队列最大任务限制

      当num=0时设置线程数为无限大。

    void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num)
    {
        if (pthread_mutex_lock(&pool->mutex) != 0){
            return -1;
        }
        z_change_maxtask_num(pool, num);  //改变最大任务限制
        pthread_mutex_unlock(&pool->mutex);
    }

      10.使用示例

    int main()
    {
        int array[10000] = {0};
        int i = 0;
        zoey_threadpool_conf_t conf = {5,0,5}; //实例化启动参数
        zoey_threadpool_t *pool = zoey_threadpool_init(&conf);//初始化线程池
        if (pool == NULL){
            return 0;
        }
        for (; i < 10000; i++){
            array[i] = i;
            if (i == 80){
                zoey_thread_add(pool); //增加线程
                zoey_thread_add(pool);
            }
            
            if (i == 100){
                zoey_set_max_tasknum(pool, 0); //改变最大任务数   0为不做上限
            }
            while(1){
                if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){
                    break;
                }
                printf("error in i = %d
    ",i);
            
            }
        }
        zoey_threadpool_destroy(pool);
    
        while(1){
            sleep(5);
        }
        return 0;
    }

      11.源码

    https://github.com/unlikewashface/zoey_threadpool.git
  • 相关阅读:
    实现一个程序两套快捷键
    SystemC中文教程一
    logback的使用和logback.xml详解
    mysql语句练习50题
    Intellij IDEA中使用Debug调试详解
    用node-webkit把web应用打包成桌面应用
    Idea导入项目详解
    iReport 5.6.0 Error: net.sf.jasperreports.engine.JRException: Error executing SQL statement for : data 最优解决方案
    CentOS 7.X 关闭防火墙
    将 MySQL root 的远程访问密码由空密码改为 password
  • 原文地址:https://www.cnblogs.com/acool/p/4737539.html
Copyright © 2011-2022 走看看