zoukankan      html  css  js  c++  java
  • Linux高并发web服务器开发——web服务器-3

    在学习Linux高并发web服务器开发总结了笔记,并分享出来。有问题请及时联系博主:Alliswell_WP,转载请注明出处。

    11_服务器开发-第03天(web服务器 - 3)

    目录:
    一、学习目标
    二、 复习
    三、strftime函数
    四、libevent实现httpserver
    五、线程池
    1、线程池的原理
    2、线程池各个函数分析
    附—整体代码

    一、学习目标

    1、libevent实现httpserver

    2、线程池

    二、 复习

    1、epoll服务器端代码

    三、strftime函数

       

    》使用:

    char buf[1024];

    strftime(buf, sizeof(buf), "%Y %b %d %H %M %S", st.st_mtime);//把st.st_mtime按双引号中格式 格式化到buf中。

    》类似于QT中(QDateTime、QTime、QDate):如调用QDateTime后,然后toString("yy MM dd")把QDatetime转为特定格式

    如获取本地时间:

    QDateTime d(QDateTime::currentDateTime());
    qDebug()<<d.toString("yy.MM.dd") ;   

    四、libevent实现httpserver

    》技巧:在vim中点击“F4”查看所有函数,在右侧垂直分屏显示,点击“Ctrl+w+w”切换分屏到右侧函数。

    >main.c

    #include <stdio.h>
    #include <unistd.h>
    #include <arpa/inet.h>
    #include <stdlib.h>
    #include <string.h>
    #include <signal.h>
    #include <event2/bufferevent.h>
    #include <event2/listener.h>
    #include <event2/event.h>
    #include "libevent_http.h"
    
    int main(int argc, char **argv)
    {
        if(argc < 3)
        {
            printf("./event_http port path
    ");
            return -1;
        }
        if(chdir(argv[2]) < 0) {
            printf("dir is not exists: %s
    ", argv[2]);
            perror("chdir err:");
            return -1;
        }
        
        struct event_base *base;
        struct evconnlistener *listener;
        struct event *signal_event;
    
        struct sockaddr_in sin;
        base = event_base_new();//创建事件处理框架
        if (!base)
        {
            fprintf(stderr, "Could not initialize libevent!
    ");
            return 1;
        }
    
        memset(&sin, 0, sizeof(sin));
        sin.sin_family = AF_INET;
        sin.sin_port = htons(atoi(argv[1]));
        //服务器端创建2个套接字
        // 创建监听的套接字,绑定,监听,接受连接请求
        listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
                        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
                        (struct sockaddr*)&sin, sizeof(sin));
        if (!listener)
        {
            fprintf(stderr, "Could not create a listener!
    ");
            return 1;
        }
    
        // 创建信号事件, 捕捉并处理
        signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
        if (!signal_event || event_add(signal_event, NULL)<0) 
        {
            fprintf(stderr, "Could not create/add a signal event!
    ");
            return 1;
        }
    
        // 事件循环
        event_base_dispatch(base);
    
        evconnlistener_free(listener);
        event_free(signal_event);
        event_base_free(base);
    
        printf("done
    ");
    
        return 0;
    }
    main.c

    >libevent_http.h

    #ifndef _LIBEVENT_HTTP_H
    #define _LIBEVENT_HTTP_H
    
    #include <event2/event.h>
    
    void conn_eventcb(struct bufferevent *bev, short events, void *user_data);
    
    void conn_readcb(struct bufferevent *bev, void *user_data);
    
    const char *get_file_type(char *name);
    
    int hexit(char c);
    
    void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
                     struct sockaddr *sa, int socklen, void *user_data);
    
    int response_http(struct bufferevent *bev, const char *method, char *path);
    
    int send_dir(struct bufferevent *bev,const char *dirname);
    
    int send_error(struct bufferevent *bev);
    
    int send_file_to_http(const char *filename, struct bufferevent *bev);
    
    int send_header(struct bufferevent *bev, int no, const char* desp, const char *type, long len);
    
    void signal_cb(evutil_socket_t sig, short events, void *user_data);
    
    void strdecode(char *to, char *from);
    
    void strencode(char* to, size_t tosize, const char* from);
    
    #endif
    libevent_http.h

    >libevent_http.c

    #include <stdio.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <fcntl.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <sys/stat.h>
    #include <string.h>
    #include <dirent.h>
    #include <time.h>
    #include <signal.h>
    #include <ctype.h>
    #include <errno.h>
    #include <event2/bufferevent.h>
    #include <event2/buffer.h>
    #include <event2/listener.h>
    #include "libevent_http.h"
    
    #define _HTTP_CLOSE_ "Connection: close
    "
    
    int response_http(struct bufferevent *bev, const char *method, char *path)
    {
        if(strcasecmp("GET", method) == 0){
            //get method ...
            strdecode(path, path);
            char *pf = &path[1];
    
            if(strcmp(path, "/") == 0 || strcmp(path, "/.") == 0)
            {
                pf="./";
            }
    
            printf("***** http Request Resource Path =  %s, pf = %s
    ", path, pf);
    
            struct stat sb;
            if(stat(pf,&sb) < 0)
            {
                perror("open file err:");
                send_error(bev);
                return -1;
            }
    
            if(S_ISDIR(sb.st_mode))//处理目录
            {
                //应该显示目录列表
                send_header(bev, 200, "OK", get_file_type(".html"), -1);
                send_dir(bev, pf);
            }
            else //处理文件
            {
                send_header(bev, 200, "OK", get_file_type(pf), sb.st_size);
                send_file_to_http(pf, bev);
            }
        }
    
        return 0;
    }
    
    /*
         *charset=iso-8859-1    西欧的编码,说明网站采用的编码是英文;
         *charset=gb2312        说明网站采用的编码是简体中文;
         *charset=utf-8            代表世界通用的语言编码;
         *                        可以用到中文、韩文、日文等世界上所有语言编码上
         *charset=euc-kr        说明网站采用的编码是韩文;
         *charset=big5            说明网站采用的编码是繁体中文;
         *
         *以下是依据传递进来的文件名,使用后缀判断是何种文件类型
         *将对应的文件类型按照http定义的关键字发送回去
    */
    const char *get_file_type(char *name)
    {
        char* dot;
    
        dot = strrchr(name, '.');    //自右向左查找‘.’字符;如不存在返回NULL
    
        if (dot == (char*)0)
            return "text/plain; charset=utf-8";
        if (strcmp(dot, ".html") == 0 || strcmp(dot, ".htm") == 0)
            return "text/html; charset=utf-8";
        if (strcmp(dot, ".jpg") == 0 || strcmp(dot, ".jpeg") == 0)
            return "image/jpeg";
        if (strcmp(dot, ".gif") == 0)
            return "image/gif";
        if (strcmp(dot, ".png") == 0)
            return "image/png";
        if (strcmp(dot, ".css") == 0)
            return "text/css";
        if (strcmp(dot, ".au") == 0)
            return "audio/basic";
        if (strcmp( dot, ".wav") == 0)
            return "audio/wav";
        if (strcmp(dot, ".avi") == 0)
            return "video/x-msvideo";
        if (strcmp(dot, ".mov") == 0 || strcmp(dot, ".qt") == 0)
            return "video/quicktime";
        if (strcmp(dot, ".mpeg") == 0 || strcmp(dot, ".mpe") == 0)
            return "video/mpeg";
        if (strcmp(dot, ".vrml") == 0 || strcmp(dot, ".wrl") == 0)
            return "model/vrml";
        if (strcmp(dot, ".midi") == 0 || strcmp(dot, ".mid") == 0)
            return "audio/midi";
        if (strcmp(dot, ".mp3") == 0)
            return "audio/mpeg";
        if (strcmp(dot, ".ogg") == 0)
            return "application/ogg";
        if (strcmp(dot, ".pac") == 0)
            return "application/x-ns-proxy-autoconfig";
    
        return "text/plain; charset=utf-8";
    }
    
    int send_file_to_http(const char *filename, struct bufferevent *bev)
    {
        int fd = open(filename, O_RDONLY);
        int ret = 0;
        char buf[4096] = {0};
    
        while(  (ret = read(fd, buf, sizeof(buf)) ) )
        {
            bufferevent_write(bev, buf, ret);
            memset(buf, 0, ret);
        }
        close(fd);
        return 0;
    }
    
    int send_header(struct bufferevent *bev, int no, const char* desp, const char *type, long len)
    {
        char buf[256]={0};
    
        sprintf(buf, "HTTP/1.1 %d %s
    ", no, desp);
        //HTTP/1.1 200 OK
    
        bufferevent_write(bev, buf, strlen(buf));
        // 文件类型
        sprintf(buf, "Content-Type:%s
    ", type);
        bufferevent_write(bev, buf, strlen(buf));
        // 文件大小
        sprintf(buf, "Content-Length:%ld
    ", len);
        bufferevent_write(bev, buf, strlen(buf));
        // Connection: close
        bufferevent_write(bev, _HTTP_CLOSE_, strlen(_HTTP_CLOSE_));
        //send 
    
        bufferevent_write(bev, "
    ", 2);
    
        return 0;
    }
    
    int send_error(struct bufferevent *bev)
    {
        send_header(bev,404, "File Not Found", "text/html", -1);
        send_file_to_http("404.html", bev);
        return 0;
    }
    
    int send_dir(struct bufferevent *bev,const char *dirname)
    {
        char encoded_name[1024];
        char path[1024];
        char timestr[64];
        struct stat sb;
        struct dirent **dirinfo;
    
        char buf[4096] = {0};
        sprintf(buf, "<html><head><meta charset="utf-8"><title>%s</title></head>", dirname);
        sprintf(buf+strlen(buf), "<body><h1>当前目录:%s</h1><table>", dirname);
        //添加目录内容
        int num = scandir(dirname, &dirinfo, NULL, alphasort);
        for(int i=0; i<num; ++i)
        {
            // 编码
            strencode(encoded_name, sizeof(encoded_name), dirinfo[i]->d_name);
    
            sprintf(path, "%s%s", dirname, dirinfo[i]->d_name);
            printf("############# path = %s
    ", path);
            if (lstat(path, &sb) < 0)
            {
                sprintf(buf+strlen(buf), 
                        "<tr><td><a href="%s">%s</a></td></tr>
    ", 
                        encoded_name, dirinfo[i]->d_name);
            }
            else
            {
                strftime(timestr, sizeof(timestr), 
                         "  %d  %b   %Y  %H:%M", localtime(&sb.st_mtime));
                if(S_ISDIR(sb.st_mode))
                {
                    sprintf(buf+strlen(buf), 
                            "<tr><td><a href="%s/">%s/</a></td><td>%s</td><td>%ld</td></tr>
    ",
                            encoded_name, dirinfo[i]->d_name, timestr, sb.st_size);
                }
                else
                {
                    sprintf(buf+strlen(buf), 
                            "<tr><td><a href="%s">%s</a></td><td>%s</td><td>%ld</td></tr>
    ", 
                            encoded_name, dirinfo[i]->d_name, timestr, sb.st_size);
                }
            }
            bufferevent_write(bev, buf, strlen(buf));
            memset(buf, 0, sizeof(buf));
        }
        sprintf(buf+strlen(buf), "</table></body></html>");
        bufferevent_write(bev, buf, strlen(buf));
        printf("################# Dir Read OK !!!!!!!!!!!!!!
    ");
    
        return 0;
    }
    
    void conn_readcb(struct bufferevent *bev, void *user_data)
    {
        printf("******************** begin call %s.........
    ",__FUNCTION__);
        char buf[4096]={0};
        char method[50], path[4096], protocol[32];
        bufferevent_read(bev, buf, sizeof(buf));
        printf("buf[%s]
    ", buf);
        sscanf(buf, "%[^ ] %[^ ] %[^ 
    ]", method, path, protocol);
        printf("method[%s], path[%s], protocol[%s]
    ", method, path, protocol);
        if(strcasecmp(method, "GET") == 0)
        {
            response_http(bev, method, path);
        }
        printf("******************** end call %s.........
    ", __FUNCTION__);
    }
    
    void conn_eventcb(struct bufferevent *bev, short events, void *user_data)
    {
        printf("******************** begin call %s.........
    ", __FUNCTION__);
        if (events & BEV_EVENT_EOF)
        {
            printf("Connection closed.
    ");
        }
        else if (events & BEV_EVENT_ERROR)
        {
            printf("Got an error on the connection: %s
    ",
                   strerror(errno));
        }
    
        bufferevent_free(bev);
        printf("******************** end call %s.........
    ", __FUNCTION__);
    }
    
    void signal_cb(evutil_socket_t sig, short events, void *user_data)
    {
        struct event_base *base = user_data;
        struct timeval delay = { 1, 0 };
    
        printf("Caught an interrupt signal; exiting cleanly in one seconds.
    ");
        event_base_loopexit(base, &delay);
    }
    
    //fd对应的是通信的文件描述符
    void listener_cb(struct evconnlistener *listener, 
                     evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data)
    {
        printf("******************** begin call-------%s
    ",__FUNCTION__);
        struct event_base *base = user_data;
        struct bufferevent *bev;
        printf("fd is %d
    ",fd);
        bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
        if (!bev)
        {
            fprintf(stderr, "Error constructing bufferevent!");
            event_base_loopbreak(base);
            return;
        }
        bufferevent_flush(bev, EV_READ | EV_WRITE, BEV_NORMAL);
        bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, NULL);
        bufferevent_enable(bev, EV_READ | EV_WRITE);
    
        printf("******************** end call-------%s
    ",__FUNCTION__);
    }
    
    /*
     * 这里的内容是处理%20之类的东西!是"解码"过程。
     * %20 URL编码中的‘ ’(space)
     * %21 '!' %22 '"' %23 '#' %24 '$'
     * %25 '%' %26 '&' %27 ''' %28 '('......
     * 相关知识html中的‘ ’(space)是&nbsp
     */
    void strdecode(char *to, char *from)
    {
        for ( ; *from != ''; ++to, ++from)
        {
            if (from[0] == '%' && isxdigit(from[1]) && isxdigit(from[2]))
            {
                // 依次判断from中 %20 三个字符
                *to = hexit(from[1])*16 + hexit(from[2]);
                // 移过已经处理的两个字符(%21指针指向1),表达式3的++from还会再向后移一个字符
                from += 2;
            }
            else
            {
                *to = *from;
            }
        }
        *to = '';
    }
    
    //16进制数转化为10进制, return 0不会出现
    int hexit(char c)
    {
        if (c >= '0' && c <= '9')
            return c - '0';
        if (c >= 'a' && c <= 'f')
            return c - 'a' + 10;
        if (c >= 'A' && c <= 'F')
            return c - 'A' + 10;
    
        return 0;
    }
    
    // "编码",用作回写浏览器的时候,将除字母数字及/_.-~以外的字符转义后回写。
    // strencode(encoded_name, sizeof(encoded_name), name);
    void strencode(char* to, size_t tosize, const char* from)
    {
        int tolen;
    
        for (tolen = 0; *from != '' && tolen + 4 < tosize; ++from)
        {
            if (isalnum(*from) || strchr("/_.-~", *from) != (char*)0)
            {
                *to = *from;
                ++to;
                ++tolen;
            }
            else
            {
                sprintf(to, "%%%02x", (int) *from & 0xff);
                to += 3;
                tolen += 3;
            }
        }
        *to = '';
    }
    libevent_http.c

    >makefile

    #makefile 
    
    target = http_server
    $(target):libevent_http.c main.c
        gcc -o $@ $^ -g -levent
    
    .PHONY:clean
    clean:
        -rm -f $(target)
    makefile

    五、线程池

    1、线程池的原理

    》线程池:

    2、线程池各个函数分析:

    (1)线程池结构体:struct threadpool_t

    /* 描述线程池相关信息 */
    struct threadpool_t {
        pthread_mutex_t lock;               /* 用于锁住本结构体 */    
        pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */
        pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
        pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */
    
        pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
        pthread_t adjust_tid;               /* 存管理线程tid */
        threadpool_task_t *task_queue;      /* 任务队列 */
    
        int min_thr_num;                    /* 线程池最小线程数 */
        int max_thr_num;                    /* 线程池最大线程数 */
        int live_thr_num;                   /* 当前存活线程个数 */
        int busy_thr_num;                   /* 忙状态线程个数 */
        int wait_exit_thr_num;              /* 要销毁的线程个数 */
    
        int queue_front;                    /* task_queue队头下标 */
        int queue_rear;                     /* task_queue队尾下标 */
        int queue_size;                     /* task_queue队中实际任务数 */
        int queue_max_size;                 /* task_queue队列可容纳任务数上限 */
    
        int shutdown;                       /* 标志位,线程池使用状态,true或false */
    };
    struct threadpool_t

    (2)如何使用线程池:main主函数中调用threadpool_create、threadpool_add(包含回调process)、threadpool_destroy

    /* 线程池中的线程,模拟处理业务 */
    void *process(void *arg)
    {
        printf("thread 0x%x working on task %d
     ",(unsigned int)pthread_self(),*(int *)arg);
        sleep(1);
        printf("task %d is end
    ",*(int *)arg);
    
        return NULL;
    }
    int main(void)
    {
        /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
    
        threadpool_t *thp = threadpool_create(3,100,100);/*创建线程池,池里最小3个线程,最大100,队列最大100*/
        printf("pool inited");
    
        //int *num = (int *)malloc(sizeof(int)*20);
        int num[20], i;
        for (i = 0; i < 20; i++) {
            num[i]=i;
            printf("add task %d
    ",i);
            threadpool_add(thp, process, (void*)&num[i]);     /* 向线程池中添加任务 */
        }
        sleep(10);                                          /* 等子线程完成任务 */
        threadpool_destroy(thp);
    
        return 0;
    }
    int main(void)

    (3)初始化线程池的函数:threadpool_create

    typedef struct {
        void *(*function)(void *);          /* 函数指针,回调函数 */
        void *arg;                          /* 上面函数的参数 */
    } threadpool_task_t;  
    
    threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
    {
        int i;
        threadpool_t *pool = NULL;
        do {
            if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  
                printf("malloc threadpool fail");
                break;/*跳出do while*/
            }
    
            pool->min_thr_num = min_thr_num;
            pool->max_thr_num = max_thr_num;
            pool->busy_thr_num = 0;
            pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
            pool->queue_size = 0;                           /* 有0个产品 */
            pool->queue_max_size = queue_max_size;
            pool->queue_front = 0;
            pool->queue_rear = 0;
            pool->shutdown = false;                         /* 不关闭线程池 */
    
            /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
            pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
            if (pool->threads == NULL) {
                printf("malloc threads fail");
                break;
            }
            memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
    
            /* 队列开辟空间 */
            pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
            if (pool->task_queue == NULL) {
                printf("malloc task_queue fail");
                break;
            }
    
            /* 初始化互斥琐、条件变量 */
            if (pthread_mutex_init(&(pool->lock), NULL) != 0
                    || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
                    || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
                    || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
            {
                printf("init the lock or cond fail");
                break;
            }
    
            /* 启动 min_thr_num 个 work thread */
            for (i = 0; i < min_thr_num; i++) {
                pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
                printf("start thread 0x%x...
    ", (unsigned int)pool->threads[i]);
            }
            pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
    
            return pool;
    
        } while (0);
    
        threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */
    
        return NULL;
    }
    threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)

    (4)干活的线程的回调函数:threadpool_thread

    /* 线程池中各个工作线程 */
    void *threadpool_thread(void *threadpool)
    {
        threadpool_t *pool = (threadpool_t *)threadpool;
        threadpool_task_t task;
    
        while (true) {
            /* Lock must be taken to wait on conditional variable */
            /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
            pthread_mutex_lock(&(pool->lock));
    
            /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
            while ((pool->queue_size == 0) && (!pool->shutdown)) {  
                printf("thread 0x%x is waiting
    ", (unsigned int)pthread_self());
                pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
    
                /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
                if (pool->wait_exit_thr_num > 0) {
                    pool->wait_exit_thr_num--;
    
                    /*如果线程池里线程个数大于最小值时可以结束当前线程*/
                    if (pool->live_thr_num > pool->min_thr_num) {
                        printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
                        pool->live_thr_num--;
                        pthread_mutex_unlock(&(pool->lock));
                        pthread_exit(NULL);
                    }
                }
            }
    
            /*如果指定了true,要关闭线程池里的每个线程,自行退出处理*/
            if (pool->shutdown) {
                pthread_mutex_unlock(&(pool->lock));
                printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
                pthread_exit(NULL);     /* 线程自行结束 */
            }
    
            /*从任务队列里获取任务, 是一个出队操作*/
            task.function = pool->task_queue[pool->queue_front].function;
            task.arg = pool->task_queue[pool->queue_front].arg;
    
            pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */
            pool->queue_size--;
    
            /*通知可以有新的任务添加进来*/
            pthread_cond_broadcast(&(pool->queue_not_full));
    
            /*任务取出后,立即将 线程池琐 释放*/
            pthread_mutex_unlock(&(pool->lock));
    
            /*执行任务*/ 
            printf("thread 0x%x start working
    ", (unsigned int)pthread_self());
            pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/
            pool->busy_thr_num++;                                                   /*忙状态线程数+1*/
            pthread_mutex_unlock(&(pool->thread_counter));
            (*(task.function))(task.arg);                                           /*执行回调函数任务*/
            //task.function(task.arg);                                              /*执行回调函数任务*/
    
            /*任务结束处理*/ 
            printf("thread 0x%x end working
    ", (unsigned int)pthread_self());
            pthread_mutex_lock(&(pool->thread_counter));
            pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/
            pthread_mutex_unlock(&(pool->thread_counter));
        }
    
        pthread_exit(NULL);
    }
    void *threadpool_thread(void *threadpool)

    (5)管理者线程回调函数:adjust_thread

    /* 管理线程 */
    void *adjust_thread(void *threadpool)
    {
        int i;
        threadpool_t *pool = (threadpool_t *)threadpool;
        while (!pool->shutdown) {
    
            sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/
    
            pthread_mutex_lock(&(pool->lock));
            int queue_size = pool->queue_size;                      /* 关注 任务数 */
            int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */
            pthread_mutex_unlock(&(pool->lock));
    
            pthread_mutex_lock(&(pool->thread_counter));
            int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */
            pthread_mutex_unlock(&(pool->thread_counter));
    
            /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
            if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
                pthread_mutex_lock(&(pool->lock));  
                int add = 0;
    
                /*一次增加 DEFAULT_THREAD 个线程*/
                for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
                        && pool->live_thr_num < pool->max_thr_num; i++) {
                    if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
                        pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
                        add++;
                        pool->live_thr_num++;
                    }
                }
    
                pthread_mutex_unlock(&(pool->lock));
            }
    
            /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
            if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {
    
                /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
                pthread_mutex_lock(&(pool->lock));
                pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */
                pthread_mutex_unlock(&(pool->lock));
    
                for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
                    /* 通知处在空闲状态的线程, 他们会自行终止*/
                    pthread_cond_signal(&(pool->queue_not_empty));
                }
            }
        }
    
        return NULL;
    }
    void *adjust_thread(void *threadpool)

    》附—整体代码如下:

    >threadpool.c

      1 #include <stdlib.h>
      2 #include <pthread.h>
      3 #include <unistd.h>
      4 #include <assert.h>
      5 #include <stdio.h>
      6 #include <string.h>
      7 #include <signal.h>
      8 #include <errno.h>
      9 #include "threadpool.h"
     10 
     11 #define DEFAULT_TIME 10                 /*10s检测一次*/
     12 #define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
     13 #define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
     14 #define true 1
     15 #define false 0
     16 
     17 typedef struct {
     18     void *(*function)(void *);          /* 函数指针,回调函数 */
     19     void *arg;                          /* 上面函数的参数 */
     20 } threadpool_task_t;                    /* 各子线程任务结构体 */
     21 
     22 /* 描述线程池相关信息 */
     23 struct threadpool_t {
     24     pthread_mutex_t lock;               /* 用于锁住本结构体 */    
     25     pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */
     26     pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
     27     pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */
     28 
     29     pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
     30     pthread_t adjust_tid;               /* 存管理线程tid */
     31     threadpool_task_t *task_queue;      /* 任务队列 */
     32 
     33     int min_thr_num;                    /* 线程池最小线程数 */
     34     int max_thr_num;                    /* 线程池最大线程数 */
     35     int live_thr_num;                   /* 当前存活线程个数 */
     36     int busy_thr_num;                   /* 忙状态线程个数 */
     37     int wait_exit_thr_num;              /* 要销毁的线程个数 */
     38 
     39     int queue_front;                    /* task_queue队头下标 */
     40     int queue_rear;                     /* task_queue队尾下标 */
     41     int queue_size;                     /* task_queue队中实际任务数 */
     42     int queue_max_size;                 /* task_queue队列可容纳任务数上限 */
     43 
     44     int shutdown;                       /* 标志位,线程池使用状态,true或false */
     45 };
     46 
     47 /**
     48  * @function void *threadpool_thread(void *threadpool)
     49  * @desc the worker thread
     50  * @param threadpool the pool which own the thread
     51  */
     52 void *threadpool_thread(void *threadpool);
     53 
     54 /**
     55  * @function void *adjust_thread(void *threadpool);
     56  * @desc manager thread
     57  * @param threadpool the threadpool
     58  */
     59 void *adjust_thread(void *threadpool);
     60 
     61 /**
     62  * check a thread is alive
     63  */
     64 int is_thread_alive(pthread_t tid);
     65 int threadpool_free(threadpool_t *pool);
     66 
     67 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
     68 {
     69     int i;
     70     threadpool_t *pool = NULL;
     71     do {
     72         if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  
     73             printf("malloc threadpool fail");
     74             break;/*跳出do while*/
     75         }
     76 
     77         pool->min_thr_num = min_thr_num;
     78         pool->max_thr_num = max_thr_num;
     79         pool->busy_thr_num = 0;
     80         pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
     81         pool->queue_size = 0;                           /* 有0个产品 */
     82         pool->queue_max_size = queue_max_size;
     83         pool->queue_front = 0;
     84         pool->queue_rear = 0;
     85         pool->shutdown = false;                         /* 不关闭线程池 */
     86 
     87         /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
     88         pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
     89         if (pool->threads == NULL) {
     90             printf("malloc threads fail");
     91             break;
     92         }
     93         memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
     94 
     95         /* 队列开辟空间 */
     96         pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
     97         if (pool->task_queue == NULL) {
     98             printf("malloc task_queue fail");
     99             break;
    100         }
    101 
    102         /* 初始化互斥琐、条件变量 */
    103         if (pthread_mutex_init(&(pool->lock), NULL) != 0
    104                 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
    105                 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
    106                 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
    107         {
    108             printf("init the lock or cond fail");
    109             break;
    110         }
    111 
    112         /* 启动 min_thr_num 个 work thread */
    113         for (i = 0; i < min_thr_num; i++) {
    114             pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
    115             printf("start thread 0x%x...
    ", (unsigned int)pool->threads[i]);
    116         }
    117         pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
    118 
    119         return pool;
    120 
    121     } while (0);
    122 
    123     threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */
    124 
    125     return NULL;
    126 }
    127 
    128 /* 向线程池中 添加一个任务 */
    129 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
    130 {
    131     pthread_mutex_lock(&(pool->lock));
    132 
    133     /* ==为真,队列已经满, 调wait阻塞 */
    134     while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
    135         pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    136     }
    137     if (pool->shutdown) {
    138         pthread_mutex_unlock(&(pool->lock));
    139     }
    140 
    141     /* 清空 工作线程 调用的回调函数 的参数arg */
    142     if (pool->task_queue[pool->queue_rear].arg != NULL) {
    143         free(pool->task_queue[pool->queue_rear].arg);
    144         pool->task_queue[pool->queue_rear].arg = NULL;
    145     }
    146     /*添加任务到任务队列里*/
    147     pool->task_queue[pool->queue_rear].function = function;
    148     pool->task_queue[pool->queue_rear].arg = arg;
    149     pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */
    150     pool->queue_size++;
    151 
    152     /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
    153     pthread_cond_signal(&(pool->queue_not_empty));
    154     pthread_mutex_unlock(&(pool->lock));
    155 
    156     return 0;
    157 }
    158 
    159 /* 线程池中各个工作线程 */
    160 void *threadpool_thread(void *threadpool)
    161 {
    162     threadpool_t *pool = (threadpool_t *)threadpool;
    163     threadpool_task_t task;
    164 
    165     while (true) {
    166         /* Lock must be taken to wait on conditional variable */
    167         /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
    168         pthread_mutex_lock(&(pool->lock));
    169 
    170         /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
    171         while ((pool->queue_size == 0) && (!pool->shutdown)) {  
    172             printf("thread 0x%x is waiting
    ", (unsigned int)pthread_self());
    173             pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
    174 
    175             /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
    176             if (pool->wait_exit_thr_num > 0) {
    177                 pool->wait_exit_thr_num--;
    178 
    179                 /*如果线程池里线程个数大于最小值时可以结束当前线程*/
    180                 if (pool->live_thr_num > pool->min_thr_num) {
    181                     printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    182                     pool->live_thr_num--;
    183                     pthread_mutex_unlock(&(pool->lock));
    184                     pthread_exit(NULL);
    185                 }
    186             }
    187         }
    188 
    189         /*如果指定了true,要关闭线程池里的每个线程,自行退出处理*/
    190         if (pool->shutdown) {
    191             pthread_mutex_unlock(&(pool->lock));
    192             printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    193             pthread_exit(NULL);     /* 线程自行结束 */
    194         }
    195 
    196         /*从任务队列里获取任务, 是一个出队操作*/
    197         task.function = pool->task_queue[pool->queue_front].function;
    198         task.arg = pool->task_queue[pool->queue_front].arg;
    199 
    200         pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */
    201         pool->queue_size--;
    202 
    203         /*通知可以有新的任务添加进来*/
    204         pthread_cond_broadcast(&(pool->queue_not_full));
    205 
    206         /*任务取出后,立即将 线程池琐 释放*/
    207         pthread_mutex_unlock(&(pool->lock));
    208 
    209         /*执行任务*/ 
    210         printf("thread 0x%x start working
    ", (unsigned int)pthread_self());
    211         pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/
    212         pool->busy_thr_num++;                                                   /*忙状态线程数+1*/
    213         pthread_mutex_unlock(&(pool->thread_counter));
    214         (*(task.function))(task.arg);                                           /*执行回调函数任务*/
    215         //task.function(task.arg);                                              /*执行回调函数任务*/
    216 
    217         /*任务结束处理*/ 
    218         printf("thread 0x%x end working
    ", (unsigned int)pthread_self());
    219         pthread_mutex_lock(&(pool->thread_counter));
    220         pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/
    221         pthread_mutex_unlock(&(pool->thread_counter));
    222     }
    223 
    224     pthread_exit(NULL);
    225 }
    226 
    227 /* 管理线程 */
    228 void *adjust_thread(void *threadpool)
    229 {
    230     int i;
    231     threadpool_t *pool = (threadpool_t *)threadpool;
    232     while (!pool->shutdown) {
    233 
    234         sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/
    235 
    236         pthread_mutex_lock(&(pool->lock));
    237         int queue_size = pool->queue_size;                      /* 关注 任务数 */
    238         int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */
    239         pthread_mutex_unlock(&(pool->lock));
    240 
    241         pthread_mutex_lock(&(pool->thread_counter));
    242         int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */
    243         pthread_mutex_unlock(&(pool->thread_counter));
    244 
    245         /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
    246         if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
    247             pthread_mutex_lock(&(pool->lock));  
    248             int add = 0;
    249 
    250             /*一次增加 DEFAULT_THREAD 个线程*/
    251             for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
    252                     && pool->live_thr_num < pool->max_thr_num; i++) {
    253                 if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
    254                     pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
    255                     add++;
    256                     pool->live_thr_num++;
    257                 }
    258             }
    259 
    260             pthread_mutex_unlock(&(pool->lock));
    261         }
    262 
    263         /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
    264         if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {
    265 
    266             /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
    267             pthread_mutex_lock(&(pool->lock));
    268             pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */
    269             pthread_mutex_unlock(&(pool->lock));
    270 
    271             for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
    272                 /* 通知处在空闲状态的线程, 他们会自行终止*/
    273                 pthread_cond_signal(&(pool->queue_not_empty));
    274             }
    275         }
    276     }
    277 
    278     return NULL;
    279 }
    280 
    281 int threadpool_destroy(threadpool_t *pool)
    282 {
    283     int i;
    284     if (pool == NULL) {
    285         return -1;
    286     }
    287     pool->shutdown = true;
    288 
    289     /*先销毁管理线程*/
    290     pthread_join(pool->adjust_tid, NULL);
    291 
    292     for (i = 0; i < pool->live_thr_num; i++) {
    293         /*通知所有的空闲线程*/
    294         pthread_cond_broadcast(&(pool->queue_not_empty));
    295     }
    296     for (i = 0; i < pool->live_thr_num; i++) {
    297         pthread_join(pool->threads[i], NULL);
    298     }
    299     threadpool_free(pool);
    300 
    301     return 0;
    302 }
    303 
    304 int threadpool_free(threadpool_t *pool)
    305 {
    306     if (pool == NULL) {
    307         return -1;
    308     }
    309 
    310     if (pool->task_queue) {
    311         free(pool->task_queue);
    312     }
    313     if (pool->threads) {
    314         free(pool->threads);
    315         pthread_mutex_lock(&(pool->lock));
    316         pthread_mutex_destroy(&(pool->lock));
    317         pthread_mutex_lock(&(pool->thread_counter));
    318         pthread_mutex_destroy(&(pool->thread_counter));
    319         pthread_cond_destroy(&(pool->queue_not_empty));
    320         pthread_cond_destroy(&(pool->queue_not_full));
    321     }
    322     free(pool);
    323     pool = NULL;
    324 
    325     return 0;
    326 }
    327 
    328 int threadpool_all_threadnum(threadpool_t *pool)
    329 {
    330     int all_threadnum = -1;
    331     pthread_mutex_lock(&(pool->lock));
    332     all_threadnum = pool->live_thr_num;
    333     pthread_mutex_unlock(&(pool->lock));
    334     return all_threadnum;
    335 }
    336 
    337 int threadpool_busy_threadnum(threadpool_t *pool)
    338 {
    339     int busy_threadnum = -1;
    340     pthread_mutex_lock(&(pool->thread_counter));
    341     busy_threadnum = pool->busy_thr_num;
    342     pthread_mutex_unlock(&(pool->thread_counter));
    343     return busy_threadnum;
    344 }
    345 
    346 int is_thread_alive(pthread_t tid)
    347 {
    348     int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活
    349     if (kill_rc == ESRCH) {
    350         return false;
    351     }
    352 
    353     return true;
    354 }
    355 
    356 /*测试*/ 
    357 
    358 #if 1
    359 /* 线程池中的线程,模拟处理业务 */
    360 void *process(void *arg)
    361 {
    362     printf("thread 0x%x working on task %d
     ",(unsigned int)pthread_self(),*(int *)arg);
    363     sleep(1);
    364     printf("task %d is end
    ",*(int *)arg);
    365 
    366     return NULL;
    367 }
    368 int main(void)
    369 {
    370     /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
    371 
    372     threadpool_t *thp = threadpool_create(3,100,100);/*创建线程池,池里最小3个线程,最大100,队列最大100*/
    373     printf("pool inited");
    374 
    375     //int *num = (int *)malloc(sizeof(int)*20);
    376     int num[20], i;
    377     for (i = 0; i < 20; i++) {
    378         num[i]=i;
    379         printf("add task %d
    ",i);
    380         threadpool_add(thp, process, (void*)&num[i]);     /* 向线程池中添加任务 */
    381     }
    382     sleep(10);                                          /* 等子线程完成任务 */
    383     threadpool_destroy(thp);
    384 
    385     return 0;
    386 }
    387 
    388 #endif

    >threadpool.h

     1 #ifndef __THREADPOOL_H_
     2 #define __THREADPOOL_H_
     3 
     4 typedef struct threadpool_t threadpool_t;
     5 
     6 /**
     7  * @function threadpool_create
     8  * @descCreates a threadpool_t object.
     9  * @param thr_num  thread num
    10  * @param max_thr_num  max thread size
    11  * @param queue_max_size   size of the queue.
    12  * @return a newly created thread pool or NULL
    13  */
    14 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
    15 
    16 /**
    17  * @function threadpool_add
    18  * @desc add a new task in the queue of a thread pool
    19  * @param pool     Thread pool to which add the task.
    20  * @param function Pointer to the function that will perform the task.
    21  * @param argument Argument to be passed to the function.
    22  * @return 0 if all goes well,else -1
    23  */
    24 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
    25 
    26 /**
    27  * @function threadpool_destroy
    28  * @desc Stops and destroys a thread pool.
    29  * @param pool  Thread pool to destroy.
    30  * @return 0 if destory success else -1
    31  */
    32 int threadpool_destroy(threadpool_t *pool);
    33 
    34 /**
    35  * @desc get the thread num
    36  * @pool pool threadpool
    37  * @return # of the thread
    38  */
    39 int threadpool_all_threadnum(threadpool_t *pool);
    40 
    41 /**
    42  * desc get the busy thread num
    43  * @param pool threadpool
    44  * return # of the busy thread
    45  */
    46 int threadpool_busy_threadnum(threadpool_t *pool);
    47 
    48 #endif

    >makefile

     1 src = $(wildcard *.c)
     2 targets = $(patsubst %.c, %, $(src))
     3 
     4 CC = gcc
     5 CFLAGS = -lpthread -Wall -g 
     6 
     7 all:$(targets)
     8 
     9 $(targets):%:%.c
    10     $(CC) $< -o $@ $(CFLAGS)
    11 
    12 .PHONY:clean all
    13 clean:
    14     -rm -rf $(targets) 

    在学习Linux高并发web服务器开发总结了笔记,并分享出来。有问题请及时联系博主:Alliswell_WP,转载请注明出处。

  • 相关阅读:
    WebAPi返回类型到底应该是什么才合适,这是个问题?
    NuGet程序包安装SQLite后完全抽离出SQLite之入门介绍及注意事项,你真的懂了吗?
    完全抽离WebAPi之特殊需求返回HTML、Css、JS、Image
    模板引擎Nvelocity实例
    C#由变量捕获引起对闭包的思考
    AngularJS之指令中controller与link(十二)
    AngularJS之ng-class(十一)
    AngularJS之WebAPi上传(十)
    AngularJS之代码风格36条建议【一】(九)
    两个List合并去重
  • 原文地址:https://www.cnblogs.com/Alliswell-WP/p/CPlusPlus_Linux_19.html
Copyright © 2011-2022 走看看