zoukankan      html  css  js  c++  java
  • linux线程池thrmgr源码解析

    linux线程池thrmgr源码解析

    1         thrmgr线程池的作用

    thrmgr线程池的作用是提高程序的并发处理能力,在多CPU的服务器上运行程序,可以并发执行多个任务。

    2         thrmgr线程池的原理

    thrmgr并非像常规线程池那样,创建线程池时,创建固定数量的线程,线程一直存在,直到线程池被销毁。Thrmgr创建时只是分配线程池对象的变量,并初始化锁、条件变量等变量,并没有创建线程。当向线程池加入第一个任务时,开始创建第一个线程,处理任务,如果一直有任务,则会一直处理任务,如果加入的任务太多太快,线程池内线程都在忙碌,没有空闲等待线程,而且线程池中存在的线程数量不超过最大值,则会创建新的线程去执行任务。线程数量会增多,等到任务数量减少时,有些线程没有任务了,处于等待状态,而且等待超时,这些等待超时的线程就会自动返回,自动退出线程,线程池内活动线程数量减少。这样线程池内线程的数量会随着任务的数量动态调整。即避免了任务量大时,线程池处理不过来,又避免了任务少时,线程池内部存在大量空闲线程的缺陷。从实现了一种根据任务量动态调整的线程池。

    3         线程池基础知识

    3.1  linux线程属性pthread_attr_t

    typedef struct

    {

           int                               detachstate;   线程的分离状态

           int                               schedpolicy;  线程调度策略

           structsched_param              schedparam;  线程的调度参数

           int                               inheritsched;  线程的继承性

           int                                scope;       线程的作用域

           size_t                           guardsize;   线程栈末尾的警戒缓冲区大小

           int                                stackaddr_set;

           void*                          stackaddr;   线程栈的位置

           size_t                           stacksize;    线程栈的大小

    }pthread_attr_t;

    线程具有属性,用pthread_attr_t表示,在对该结构进行处理之前必须进行初始化,在使用后需要对其去除初始化。我们用pthread_attr_init函数对其初始化,用pthread_attr_destroy对其去除初始化。

    名称::

    pthread_attr_init/pthread_attr_destroy

    功能:

    对线程属性初始化/去除初始化

    头文件:

    #include<pthread.h>

    函数原形:

    int   pthread_attr_init(pthread_attr_t*attr);

    int   pthread_attr_destroy(pthread_attr_t*attr);

    参数:

    Attr   线程属性变量

    返回值:

    若成功返回0,若失败返回-1。

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    3.2  linux线程的分离状态

          线程的分离状态决定一个线程以什么样的方式来终止自己。在默认情况下线程是非分离状态的,这种情况下,原有的线程等待创建的线程结束。只有当pthread_join()函数返回时,创建的线程才算终止,才能释放自己占用的系统资源。而分离线程不是这样子的,它没有被其他的线程所等待,自己运行结束了,线程也就终止了,马上释放系统资源。线程池中需要的就是分离状态的线程。则可以设置pthread_attr_t结构中的detachstate线程属性为PTHREAD_CREATE_DETACHED,让线程以分离状态启动。

    名称::

    pthread_attr_getdetachstate/pthread_attr_setdetachstate

    功能:

    获取/修改线程的分离状态属性

    头文件:

    #include<pthread.h>

    函数原形:

    int   pthread_attr_getdetachstate(const pthread_attr_t *attr,int *detachstate);

    int   pthread_attr_setdetachstate(pthread_attr_t *attr,intdetachstate);

    参数:

    Attr   线程属性变量

    Detachstate  线程的分离状态属性

    返回值:

    若成功返回0,若失败返回-1。

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    3.3  Linux互斥量pthread_mutex_t

    pthread_mutex_t在线程池中的作用是为了避免多线程对公共变量同时进行访问。在访问变量前,先锁住,不让别人访问,访问结束后在释放锁,让别人访问。保证原子性操作,避免多个线程同时修改公共变量。pthread_mutex_lock(&threadpool->pool_mutex);上锁。pthread_mutex_unlock(&threadpool->pool_mutex);进行解锁,两个函数要配对使用。

    3.4  Linux条件变量pthread_cond_t

    条件变量是利用线程间共享的全局变量进行同步的一种机制,通常是要和互斥量pthread_mutex_t配合使用。pthread_cond_t在线程池中的作用是无任务时,让线程处于等待状态,等待条件变量有信号,挂起线程。当有任务时,发出信号,让线程脱离等待状态,开始执行任务。线程等待超时,则自动退出,结束线程。

    (1)初始化和反初始化

    int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

    int pthread_cond_destroy(pthread_cond_t *cond);

    (2)等待信号函数

    int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);//一直等待

    int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);//超时等待

    因为采用的是绝对时间,所以先要获取系统时间,在加上超时时间,在传入参数。

    timeout.tv_sec = time(NULL) + threadpool->idle_timeout;

    timeout.tv_nsec = 0;

    pthread_cond_wait函数需要传入mutex变量,是因为在函数内部需要对mutex进行操作,内部流程是:更新条件等待队列->释放锁->等待信号。。。->信号到来->上锁->更新条件等待队列->执行pthread_cond_wait后面的代码。在pthread_cond_wait内部有一个先解锁再加锁的过程,所以pthread_cond_wait要和pthread_mutex_t配合使用,而且,pthread_cond_wait需要在加锁和解锁之间,总的流程就是外部加锁->内部更新条件等待队列->内部释放锁->内部等待信号。。。->信号到来->内部上锁->内部更新条件等待队列->执行pthread_cond_wait后面的代码->外部解锁。

    (3)发送信号

    pthread_cond_signal();发出一个信号,激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;

    pthread_cond_broadcast();广播方式发送信号,所有pthread_cond_wait获取到信号返回,激活所有等待线程。

    4         线程池的代码实现

    /*
     *  Copyright (C) 2004 Trog <trog@clamav.net>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 2 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program; if not, write to the Free Software
     *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
     *  MA 02110-1301, USA.
     */
    
    #ifndef __THRMGR_H__
    #define __THRMGR_H__
    
    #include <pthread.h>
    
    #ifndef C_WINDOWS
    #include <sys/time.h>
    #endif
    
    typedef struct work_item_tag {
        struct work_item_tag *next;
        void *data;
        struct timeval time_queued;//
    } work_item_t;
        
    typedef struct work_queue_tag {
        work_item_t *head;
        work_item_t *tail;
        int item_count;
    } work_queue_t;
    
    typedef enum {
        POOL_INVALID,
        POOL_VALID,
        POOL_EXIT
    } pool_state_t;
    
    typedef struct threadpool_tag {
        pthread_mutex_t pool_mutex;//mutex锁,用于限制同时只有一个线程对公共资源(条件变量,线程池内部变量)访问修改
        pthread_cond_t pool_cond;//条件变量,用于多线程间等待任务,有任务的信号控制
        pthread_attr_t pool_attr;//线程的属性,主要为了设置分离线程属性,即线程循环退出自动结束线程,释放资源,不用调用函数去释放资源
        //具体可参考https://www.cnblogs.com/lidabo/p/5514222.html对属性的解释
        pool_state_t state;//线程池的状态
        int thr_max;//线程池最大线程数量
        int thr_alive;//活着的线程数量,包括正在执行任务的线程和空闲等待线程
        int thr_idle;//空闲等待的线程。
        int idle_timeout;//线程等待超时时间,超时结束后,将结束本线程
        
        void (*handler)(void *);//任务操作函数,有用户传入函数指针
        
        work_queue_t *queue;//任务队列,以单向链表的方式存储任务
    } threadpool_t;
    /*
    功能:新建线程池
    参数:
    int max_threads, 最大线程数
    int idle_timeout,超时时间
    void (*handler)(void *)函数操作句柄
    */
    threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
    /*
    功能:销毁线程池
    参数:
    threadpool_t *threadpool线程池指针
    */
    void thrmgr_destroy(threadpool_t *threadpool);
    /*
    功能:给线程池下发任务
    参数:
    threadpool_t *threadpool线程池指针
    void *user_data  用户要处理的数据
    */
    int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);
    
    #endif
    /*
     *  Copyright (C) 2004 Trog <trog@clamav.net>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 2 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program; if not, write to the Free Software
     *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
     *  MA 02110-1301, USA.
     */
    
    #if HAVE_CONFIG_H
    #include "clamav-config.h"
    #endif
    
    #include <stdio.h>
    #include <pthread.h>
    #include <time.h>
    #include <errno.h>
    
    #include "shared/output.h"
    
    #include "thrmgr.h"
    #include "others.h"
    
    #define FALSE (0)
    #define TRUE (1)
    /*新建任务队列,初始化队列参数*/
    static work_queue_t *work_queue_new(void)
    {
        work_queue_t *work_q;
        
        work_q = (work_queue_t *) malloc(sizeof(work_queue_t));
        if (!work_q) {
            return NULL;
        }
        
        work_q->head = work_q->tail = NULL;
        work_q->item_count = 0;
        return work_q;
    }
    /*向任务队列中加入任务*/
    static int work_queue_add(work_queue_t *work_q, void *data)
    {
        work_item_t *work_item;
        
        if (!work_q) {
            return FALSE;
        }
        //申请任务内存
        work_item = (work_item_t *) malloc(sizeof(work_item_t));
        if (!work_item) {
            return FALSE;
        }
        //next指针设为空,将用户数据赋值给任务
        work_item->next = NULL;
        work_item->data = data;
        //设置任务接收时间,好像没有什么用
        gettimeofday(&(work_item->time_queued), NULL);
        //第一次插入任务首尾指针都为空,所以同时指向这个任务
        if (work_q->head == NULL) {
            work_q->head = work_q->tail = work_item;
            work_q->item_count = 1;
        } else {//以后插入时,将结尾next指针指向插入任务,
            //然后将位置指针指向最后插入的任务,
            //相当于将任务加入单向链表的末尾,然后在将尾指针指向最后一个
            work_q->tail->next = work_item;
            work_q->tail = work_item;
            work_q->item_count++;//任务数量加1
        }
        return TRUE;
    }
    
    static void *work_queue_pop(work_queue_t *work_q)
    {
        work_item_t *work_item;
        void *data;
        //头指针为空,无数据返回
        if (!work_q || !work_q->head) {
            return NULL;
        }
        //获取链表中第一个任务
        work_item = work_q->head;
        //获取用户数据
        data = work_item->data;
        //将头指针向后移动一位
        work_q->head = work_item->next;
        //如果头指针是空,说明刚刚去除的任务已经是最后一个任务,需要把尾指针也置为空
        if (work_q->head == NULL) {
            work_q->tail = NULL;
        }
        //销毁任务框架内容,返回用户数据
        free(work_item);
        return data;
    }
    
    void thrmgr_destroy(threadpool_t *threadpool)
    {
        if (!threadpool || (threadpool->state != POOL_VALID)) {
            return;
        }
        //上锁
          if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
               logg("!Mutex lock failed
    ");
                exit(-1);
        }
        //设置线程池状态为退出
        threadpool->state = POOL_EXIT;
        
        /* wait for threads to exit */
        //线程池中有活的线程,广播信号变量,让所有线程都获取到信号,自动返回结束线程
        if (threadpool->thr_alive > 0) {
            if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
                pthread_mutex_unlock(&threadpool->pool_mutex);
                return;
            }
        }
        while (threadpool->thr_alive > 0) {//等待最后一个线程结束时,即thr_alive==0时,会广播一个信号,告诉所有线程都结束了
            if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
                pthread_mutex_unlock(&threadpool->pool_mutex);
                return;
            }
        }
          if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
                logg("!Mutex unlock failed
    ");
                exit(-1);
          }
        //释放资源
        pthread_mutex_destroy(&(threadpool->pool_mutex));
        pthread_cond_destroy(&(threadpool->pool_cond));
        pthread_attr_destroy(&(threadpool->pool_attr));
        free(threadpool->queue);
        free(threadpool);
        return;
    }
    
    threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
    {
        threadpool_t *threadpool;
    #if defined(C_BIGSTACK) || defined(C_BSD)
        size_t stacksize;
    #endif
        
        if (max_threads <= 0) {
            return NULL;
        }
        //创建线程池对象
        threadpool = (threadpool_t *) malloc(sizeof(threadpool_t));
        if (!threadpool) {
            return NULL;
        }
        //创建任务队列
        threadpool->queue = work_queue_new();
        if (!threadpool->queue) {
            free(threadpool);
            return NULL;
        }    
        //线程池创建只是创建对象,没有启动任何线程,线程是在接收到任务才开始创建线程
        threadpool->thr_max = max_threads;
        threadpool->thr_alive = 0;
        threadpool->thr_idle = 0;
        threadpool->idle_timeout = idle_timeout;
        threadpool->handler = handler;
        //初始化锁
        pthread_mutex_init(&(threadpool->pool_mutex), NULL);
        if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
            pthread_mutex_destroy(&(threadpool->pool_mutex));
            free(threadpool->queue);
            free(threadpool);
            return NULL;
        }
        //初始化线程属性对象
        if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
            pthread_cond_destroy(&(threadpool->pool_cond));
            pthread_mutex_destroy(&(threadpool->pool_mutex));
            free(threadpool->queue);
            free(threadpool);
            return NULL;
        }
        //将线程属性对象参数设置为分离的线程属性(PTHREAD_CREATE_DETACHED),即线程执行到末尾,自动回收资源,不用调用回收函数来回收
        if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
            pthread_attr_destroy(&(threadpool->pool_attr));
            pthread_cond_destroy(&(threadpool->pool_cond));
            pthread_mutex_destroy(&(threadpool->pool_mutex));
            free(threadpool->queue);
            free(threadpool);
            return NULL;
        }
        //设置线程堆栈大小
    #if defined(C_BIGSTACK) || defined(C_BSD)
        pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
        stacksize = stacksize + 64 * 1024;
        if (stacksize < 1048576) stacksize = 1048576; /* at least 1MB please */
        logg("Set stacksize to %u
    ", stacksize);
        pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
    #endif
        threadpool->state = POOL_VALID;
        //线程池状态设置为可用状态
        return threadpool;
    }
    
    static void *thrmgr_worker(void *arg)
    {
        threadpool_t *threadpool = (threadpool_t *) arg;
        void *job_data;
        int retval, must_exit = FALSE;
        struct timespec timeout;
        
        /* loop looking for work */
        for (;;) 
        {//锁住,要修改公共变量了threadpool,如果这里锁住,pthread_cond_timedwait等待状态,后面的释放锁不执行,加入任务的函数thrmgr_dispatch中pthread_cond_signal信号也是在锁内部,
            //不就一直发不出去,那么这边在等,不释放锁,那边在争锁,又发不出去,不是死锁了吗。其实不然,pthread_cond_timedwait内部会先释放锁,然后等待信号,然后thrmgr_dispatch就可以进入锁发送信号了。
            //然后这边等到信号后再锁住,修改信号状态值,然后返回函数。pthread_cond_timedwait内部有一个释放锁,再锁住的过程。
            if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
                /* Fatal error */
                logg("!Fatal: mutex lock failed
    ");
                exit(-2);
            }
            timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
            timeout.tv_nsec = 0;
            //等待线程数量加1
            threadpool->thr_idle++;
            //获取到任务退出循环,否则进入等待
            while (((job_data=work_queue_pop(threadpool->queue)) == NULL)&& (threadpool->state != POOL_EXIT))
            {
                /* Sleep, awaiting wakeup */
                //接收到第一个任务时,才会 创建第一个线程,所以第一次会接收到信号并处理,第二次循环如果没有任务,
                //则会在这里等待,线程被挂起,超时后往下执行
                retval = pthread_cond_timedwait(&(threadpool->pool_cond),&(threadpool->pool_mutex), &timeout);
                //如果是超时的,说明空闲超时,需要结束该线程,设置变量,让线程自动结束
                if (retval == ETIMEDOUT) {
                    must_exit = TRUE;
                    break;//跳出等待
                }
            }//等待结束,等待线程数量减1
            threadpool->thr_idle--;
            //如果线程状态为退出,也将must_exit置为true
            if (threadpool->state == POOL_EXIT) {
                must_exit = TRUE;
            }
            //threadpool变量修改结束,释放锁
            if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
                /* Fatal error */
                logg("!Fatal: mutex unlock failed
    ");
                exit(-2);
            }//如果任务不为空,执行任务
            if (job_data) {
                threadpool->handler(job_data);
            } 
            else if (must_exit)//如果是线程空闲太久或者线程池状态为退出,则退出线程
            {
                break;
            }
        }
        //又要操作公共变量threadpool->thr_alive了,锁住,不让别人来打扰
        if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
            /* Fatal error */
            logg("!Fatal: mutex lock failed
    ");
            exit(-2);
        }
        //线程即将推出,将活着的线程数量减1
        threadpool->thr_alive--;
        //最后一个线程了,临死前发出最后一个广播信号,告诉destory函数最后一个线程已经阵亡,可以结束了
        if (threadpool->thr_alive == 0) {
            /* signal that all threads are finished */
            pthread_cond_broadcast(&threadpool->pool_cond);
        }
        //修改公共变量结束,释放锁
        if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
            /* Fatal error */
            logg("!Fatal: mutex unlock failed
    ");
            exit(-2);
        }
        return NULL;
    }
    /*向线程池下发任务*/
    int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
    {
        pthread_t thr_id;
    
        if (!threadpool) {
            return FALSE;
        }
    
        /* Lock the threadpool */
        //需要访问公共对象任务队列了,开始锁住
        if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
            logg("!Mutex lock failed
    ");
            return FALSE;
        }
    
        if (threadpool->state != POOL_VALID) {
            if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
                logg("!Mutex unlock failed
    ");
                return FALSE;
            }
            return FALSE;
        }//向任务队列中加入任务
        if (!work_queue_add(threadpool->queue, user_data))
        {
            if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
                logg("!Mutex unlock failed
    ");
                return FALSE;
            }
            return FALSE;
        }
        //空闲等待线程数量为0,要么任务太多,要么第一次添加任务,线程池内线程数量为空
        if ((threadpool->thr_idle == 0) &&(threadpool->thr_alive < threadpool->thr_max)) 
        {
            /* Start a new thread */
            //启动线程
            if (pthread_create(&thr_id, &(threadpool->pool_attr),thrmgr_worker, threadpool) != 0)
            {
                logg("!pthread_create failed
    ");
            } 
            else 
            {
                threadpool->thr_alive++;//成功创建,活线程数量加1
            }
        }
        //发送一个信号,告诉空闲等待线程,有任务了
        pthread_cond_signal(&(threadpool->pool_cond));
    
        if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
            logg("!Mutex unlock failed
    ");
            return FALSE;
        }
        return TRUE;
    }

    参考文献

    (1)https://www.cnblogs.com/secondtonone1/p/5580203.html 条件变量

    (2)https://www.cnblogs.com/lidabo/p/5514222.html  线程属性

    (3)http://en.verysource.com/clamav_sourcecode_rar-download-121916.html 线程池源码

    自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

    https://www.cnblogs.com/bclshuai/p/11380657.html

    百度云盘下载地址:

    链接:https://pan.baidu.com/s/1swkQzCIKI3g3ObcebgpIDg

    提取码:mc8l

    微信公众号获取最新的软件和视频介绍

    QStockView

  • 相关阅读:
    20165328《信息安全系统设计基础》实验二固件程序设计实验报告
    20165328《信息安全系统设计基础》第六周学习总结
    2018-2019-1 20165305 20165319 20165328 实验一 开发环境的熟悉
    2018-2019-1 20165328《信息安全系统设计基础》第四周学习总结
    2018-2019-1 20165328 《信息安全系统设计基础》第三周学习总结及实验报告
    20165328《信息安全系统设计基础》第一周总结
    20165358课程总结
    20165328 实验五《网络安全编程》实验报告
    20165218 2018-2019-1 《信息安全系统》第八章学习总结
    2018-2019-1 20165218 实验三 实时系统
  • 原文地址:https://www.cnblogs.com/bclshuai/p/9851435.html
Copyright © 2011-2022 走看看