zoukankan      html  css  js  c++  java
  • C语言实现线程池

    以前写过一篇关于如何使用多线程推升推送速度(http://www.cnblogs.com/bai-jimmy/p/5177433.html),能够到达5000qps,其实已经可以满足现在的业务,不过在看nginx的说明文档时,又提到nginx支持线程池来提升响应速度, 一直对如何实现线程池很感兴趣,利用周末的时间参考别人的代码,自己写了一个初级版,并且调通了,还没在实际开发中应用,不知道效果如何

    代码如下:

    pd_log.h

    #ifndef __pd_log_
    #define __pd_log_
    
    #define LOG_DEBUG_PATH "debug.log"
    #define LOG_ERROR_PATH "error.log"
    
    /**
     * define log level
     */
    enum log_level {
            DEBUG = 0,
            ERROR = 1 
    };
    
    #define error(...) 
            logger(ERROR, __LINE__, __VA_ARGS__)
    
    #define debug(...) 
            logger(DEBUG, __LINE__, __VA_ARGS__)
    
    #define assert(expr, rc) 
            if(!(expr)){    
                    error(#expr"is null or 0");     
                    return rc;      
            }
    #endif

    pd_log.c

    #include <stdio.h>
    #include <stdlib.h>
    #include <stdarg.h>
    #include <time.h>
    
    #include "pd_log.h"
    
    /**
     * get now timestr
     */
    static void get_time(char *time_str, size_t len) {
        time_t tt;
        struct tm local_time;
        time(&tt);
        localtime_r(&tt, &local_time);
        strftime(time_str, len, "%m-%d %H:%M:%S", &local_time);
    }
    
    /**
     * log
     */
    static void logger(int flag, int line, const char *fmt, ...) {
        FILE *fp = NULL;
        char time_str[20 + 1];
        va_list args;
        get_time(time_str, sizeof(time_str));
    
        switch (flag) {
            case DEBUG:
                fp = fopen(LOG_DEBUG_PATH, "a");
                if (!fp) {
                    return;
                }
                fprintf(fp, "%s DEBUG (%d:%d) ", time_str, getpid(), line);
                break;
            case ERROR:
                fp = fopen(LOG_ERROR_PATH, "a");
                if (!fp) {
                    return;
                }
                fprintf(fp, "%s ERROR (%d:%d) ", time_str, getpid(), line);
                break;
            default:
                return;
        }
    
        va_start(args, fmt);
        vfprintf(fp, fmt, args);
        va_end(args);
        fprintf(fp, "
    ");
    
        fclose(fp);
        return;
    }

    pd_pool.h

    /**
     * 线程池头文件
     * @author jimmy
     * @date 2016-5-14
     */
    #ifndef __PD_POOL_
    #define __PD_POOL_
    
    /*任务链表*/
    typedef struct task_s{
            void (*routine)(void *);
            void *argv;
            struct task_s *next;
    } pd_task_t;
    
    /*任务队列*/
    typedef struct queue_s{
            pd_task_t *head;
            pd_task_t **tail;
            size_t max_task_num;
            size_t cur_task_num;
    }pd_queue_t;
    
    /*线程池*/
    typedef struct pool_s{
            pthread_mutex_t mutex;
            pthread_cond_t cond;
            pd_queue_t queue;
            size_t thread_num;
            //size_t thread_stack_size;
    }pd_pool_t;
    
    /*初始化线程池*/
    //pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_stack_size, size_t thread_max_num);
    #endif

    pd_poo.c

    /**
     * 线程池
     * @author jimmy
     * @date 2016-5-14
     */
    #include <stdio.h>
    #include <stdlib.h>
    #include <errno.h>
    
    #include <pthread.h>
    
    #include "pd_log.h"
    #include "pd_log.c"
    #include "pd_pool.h"
    
    /*tsd*/
    pthread_key_t key;
    
    void *pd_worker_dispatch(void *argv){
            ushort exit_flag = 0;
            pd_task_t *a_task;
            pd_pool_t *a_pool = (pd_pool_t *)argv;
            if(pthread_setspecific(key, (void *)&exit_flag) != 0){
                    return NULL;
            }
            /*动态从任务列表中获取任务执行*/
            while(!exit_flag){
                    pthread_mutex_lock(&a_pool->mutex);
                    /*如果此时任务链表为空,则需要等待条件变量为真*/
                    while(a_pool->queue.head == NULL){
                            pthread_cond_wait(&a_pool->cond, &a_pool->mutex);
                    }
                    /*从任务链表中任务开支执行*/
                    a_task = a_pool->queue.head;
                    a_pool->queue.head = a_task->next;
                    a_pool->queue.cur_task_num--;
                    if(a_pool->queue.head == NULL){
                            a_pool->queue.tail = &a_pool->queue.head;
                    }
                    /*解锁*/
                    pthread_mutex_unlock(&a_pool->mutex);
                    /*执行任务*/
                    a_task->routine(a_task->argv);
    //core
                    free(a_task);
                    a_task = NULL;
            }
            pthread_exit(0);
    }
    
    /**
     * 根据线程数创建所有的线程
     */
    static int pd_pool_create(pd_pool_t *a_pool){
            int i;
            pthread_t tid;
            for(i = 0; i < a_pool->thread_num; i++){
                    pthread_create(&tid, NULL, pd_worker_dispatch, a_pool);
            }
            return 0;
    }
    
    /**
     * 线程退出函数
     */
    void pd_pool_exit_cb(void *argv){
            unsigned int *lock = argv;
            ushort *exit_flag_ptr = pthread_getspecific(key);
            *exit_flag_ptr = 1;
            pthread_setspecific(key, (void *)exit_flag_ptr);
            *lock = 0;
    }
    
    /**
     * 线程池初始化
     */
    pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_max_num){
            pd_pool_t *a_pool = NULL;
            a_pool = calloc(1, sizeof(pd_pool_t));
            if(!a_pool){
                    error("pool_init calloc fail: %s", strerror(errno));
                    return NULL;
            }
            a_pool->thread_num = thread_num;
            //初始化队列参数
            a_pool->queue.max_task_num = thread_max_num;
            a_pool->queue.cur_task_num = 0;
            a_pool->queue.head = NULL;
            a_pool->queue.tail = &a_pool->queue.head;
            //初始化tsd
            if(pthread_key_create(&key, NULL) != 0){
                    error("pthread_key_create fail: %s", strerror(errno));
                    goto err;
            }
            //初始化互斥锁
            if(pthread_mutex_init(&a_pool->mutex, NULL) != 0){
                    error("pthread_mutex_init fail: %s", strerror(errno));
                    pthread_key_delete(key);
                    goto err;
            }
            //初始化条件变量
            if(pthread_cond_init(&a_pool->cond, NULL) != 0){
                    error("pthread_cond_init fail: %s", strerror(errno));
                    pthread_mutex_destroy(&a_pool->mutex);
                    goto err;
            }
            //创建线程池
            if(pd_pool_create(a_pool) != 0){
                    error("pd_pool_create fail: %s", strerror(errno));
                    pthread_mutex_unlock(&a_pool->mutex);
                    pthread_cond_destroy(&a_pool->cond);
                    goto err;
            }
            return a_pool;
    err:
            free(a_pool);
            return NULL;
    }
    
    /**
     * 向线程池中添加任务..
     */
    int pd_pool_add_task(pd_pool_t *a_pool, void (*routine)(void *), void *argv){
            pd_task_t *a_task = NULL;
            a_task = (pd_task_t *)calloc(1, sizeof(pd_task_t));
            if(!a_task){
                    error("add task calloc faile: %s", strerror(errno));
                    return -1;
            }
            a_task->routine = routine;
            a_task->argv = argv;
            a_task->next = NULL;
            /*加锁*/
            pthread_mutex_lock(&a_pool->mutex);
            if(a_pool->queue.cur_task_num >= a_pool->queue.max_task_num){
                    error("cur_task_num >= max_task_num");
                    goto err;
            }
            /*将任务放到末尾*/
            *(a_pool->queue.tail) = a_task;
            a_pool->queue.tail = &a_task->next;
            a_pool->queue.cur_task_num++;
            /*通知堵塞的线程*/
            pthread_cond_signal(&a_pool->cond);
            /*解锁*/
            pthread_mutex_unlock(&a_pool->mutex);
            return 0;
    err:
            pthread_mutex_unlock(&a_pool->mutex);
            free(a_task);
            return -1;
    }
    
    void pd_pool_destroy(pd_pool_t *a_pool){
            unsigned int n;
            unsigned int lock;
    
            for(n = 0; n < a_pool->thread_num; n++){
                    lock = 1;
                    if(pd_pool_add_task(a_pool, pd_pool_exit_cb, &lock) != 0){
                            error("pd_pool_destroy fail: add_task fail");
                            return;
                    }
                    while(lock){
                            usleep(1);
                    }
            }
            pthread_mutex_destroy(&a_pool->mutex);
            pthread_cond_destroy(&a_pool->cond);
            pthread_key_delete(key);
            free(a_pool);
    }
    /******************************************************************************************/
    
    void testfun(void *argv){
            printf("testfun
    ");
            sleep(1);
    }
    
    int main(){
            pd_pool_t *a_pool = pd_pool_init(9, 5);
    
            pd_pool_add_task(a_pool, testfun, NULL);
            pd_pool_add_task(a_pool, testfun, NULL);
            pd_pool_add_task(a_pool, testfun, NULL);
    
            pd_pool_destroy(a_pool);
    }
  • 相关阅读:
    PAT1118:Birds in Forest
    PAT1112:Stucked Keyboard
    PAT1091:Acute Stroke
    Java基础006 --- 类、接口、对象
    Java基础005 --- 安全管理器、可变参数等
    Java基础004 --- BigInteger和BigDecimal
    Java基础003 --- 原始数据类型
    maven完整学习笔记(2)--- 依赖范围和依赖传递
    maven完整学习笔记(1)--- 基本概念及基础命令
    Java编译器007---javac 其它选项
  • 原文地址:https://www.cnblogs.com/bai-jimmy/p/5499147.html
Copyright © 2011-2022 走看看