zoukankan      html  css  js  c++  java
  • 线程池学习笔记

    记录一下学习线程池的过程,代码用到的函数归结: 
    pthread_mutex_lock 
    pthread_mutex_unlock 
    pthread_cond_wait 
    pthread_cond_signal 
    pthread_cond_broadcast 
    pthread_create 
    pthread_join 
    程序中还用到了链表, 
    还有一个知识点:任何类型的数据都可以是void类型, 
    但void类型在使用之前必须进行强制类型转换。

    /*
    *Author:Greens_Ren
    *Description:线程池
    */
    /*头文件*/
    #include<pthread.h>
    #include<stdio.h>
    /* begin : add */
    #include<stdlib.h> 
    #include<unistd.h>
    #include<assert.h>
    #include<sys/types.h>
    /* end : add */
    
    /*数据结构*/
    typedef struct Thread_worker
    {
        void *(*worker)(void *arg);
        void * arg;
        struct Thread_worker *next;
    }CThread_worker;
    
    typedef struct Thread_pool
    {
        pthread_mutex_t queue_lock;
        pthread_cond_t queue_ready;
    
        int max_thread_num;
        pthread_t *phead_threadid;
    
        int cur_queue_size;
        CThread_worker *phead;
    
        int shutdown;
    
    }CThread_pool;
    
    /*全局区*/
    static CThread_pool *pool = NULL;
    
    /*函数*/
    
    void pthread_init(int max_thread_num);
    void *thread_roution(void * arg);
    void pthread_add_worker(void *(*worker)(void *arg), void * arg);
    void pthread_destroy(void);
    void* my_process(void *arg);
    
    /*main*/
    int main(void)
    {
        int max_thread_num = 3;
        int worker_num = 10;
    
        /*初始化*/
        pthread_init(max_thread_num);
    
        /*投入任务*/
        int *workernum = (int *)malloc(sizeof(int) * worker_num);
        int i;
        for(i = 0; i < worker_num; i++)
        {
            workernum[i] = i;
            pthread_add_worker(my_process, &workernum[i]);
        }
    
        /*等待处理任务*/
        sleep(8);
        pthread_destroy();
    
        return 0;
    }
    
    void pthread_init(int max_thread_num)
    {
        pool = (CThread_pool*)malloc(sizeof(CThread_pool)); /*free(pool)*/
    
        /* begin : modified 初始化互斥锁和条件变量*/   
        pthread_mutex_init(&(pool->queue_lock), NULL);
        pthread_cond_init(&(pool->queue_ready), NULL);
        /* end : modified */
    
        /*初始化线程*/
        pool->max_thread_num = max_thread_num;
        pool->phead_threadid = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num); /*free*/
        int i;
        for (i = 0; i < max_thread_num; i ++)
        {
            pthread_create(&(pool->phead_threadid[i]), NULL, thread_roution, NULL);
        }
    
        /*初始化任务等待队列*/
        pool->cur_queue_size = 0;
        //pool->phead = (CThread_worker *)malloc(sizeof(CThread_worker));
        //pool->phead->next = NULL;
        /*罪过,这里考虑多了。在pool中就是存了一个指针,只不过它是指定了一个链表*/
        pool->phead = NULL;
    
        /*线程池销毁标记*/
        pool->shutdown = 0;
    
        return;
    }
    void *thread_roution(void * arg)
    {
        printf("starting thread 0x%x
    ", pthread_self());
        while(1)  /*Added 线程要持续运行,这里考虑使用while(1)来实现*/
        {
            pthread_mutex_lock(&(pool->queue_lock));
            /*如果目前没有任务处理且线程池未销毁就睡眠等待添加任务*/
            //if(pool->cur_queue_size == 0 && pool->shutdown != 1)
            while(pool->cur_queue_size == 0 && pool->shutdown != 1) /*modified by*/
            {
                //pthread_cond_wait( &(pool->queue_lock), &(pool->queue_ready) );
                printf("thread 0x%x is waiting
    ", pthread_self());
                pthread_cond_wait( &(pool->queue_ready), &(pool->queue_lock) );
            }
            /*如果线程池已被标记销毁,那就退出线程*/
            if(pool->shutdown == 1)
            {
    
                pthread_mutex_unlock(&(pool->queue_lock));
                printf("thread 0x%x will exit
    ", pthread_self());
                pthread_exit(NULL);
            }
    
            printf("thread 0x%x is starting to work
    ", pthread_self());
    
            /*assert是调试用的好助手,assert如果为家,它就会通过stderr打印错误信息,并终止程序*/
            assert(pool->cur_queue_size != 0);
            assert(pool->shutdown != 1);
    
            /*开始处理等待队列中的任务*/
            /*回调函数*/
    
            pool->cur_queue_size--;
            CThread_worker * worker_waiting = pool->phead;
            pool->phead = worker_waiting->next;
            /*这里加锁的目的就是为了处理任务链表,处理完后就 可以解锁,让其它线程再次去处理任务链表*/
            pthread_mutex_unlock(&(pool->queue_lock));  /*modified by 开始的时候讲解锁放在了回调函数后面,这里做修正*/
    
            /*调用回调函数,执行任务*/
            (*(worker_waiting->worker))(worker_waiting->arg);
            /*删除链表中已经执行过的任务节点*/
            free(worker_waiting);
            worker_waiting = NULL;
        }
    }
    void pthread_add_worker(void *(*worker)(void *arg), void * arg)
    {
        assert(worker != NULL);
        assert(pool->shutdown != 1);
    
        /*构建新任务插入到任务链表尾部*/
        CThread_worker * worker_insert = 
            (CThread_worker *)malloc(sizeof(CThread_worker));/*free*/
        worker_insert->worker = worker;
        worker_insert->arg = arg;
        worker_insert->next = NULL;
    
        /*下面要处理任务链表了,这里要加锁,保护链表。第一次写的时候就忘记了加锁*/
        pthread_mutex_lock(&(pool->queue_lock));
        CThread_worker* phead_worker = pool->phead;
    
        /*链表的头指针一开始其实就是空指针,所以如果为NULL,就直接将构建的节点地址赋值给它就可以了*/
        if(phead_worker != NULL)
        {
            while( phead_worker->next != NULL )
            {
                phead_worker = phead_worker->next;
            }
            phead_worker->next = worker_insert; /*这里将新构建的任务节点添加到了等待任务链表的结尾*/
        }
        else
        {
            pool->phead = worker_insert;
        }
    
        assert(pool->phead != NULL);/*这里检查一下等待链表不为空*/
        pool->cur_queue_size++; /*同步修改等待队列的长度信息*/
        pthread_mutex_unlock(&(pool->queue_lock));
        /*等待队列中添加了新任务,这里就要唤醒线程去处理,如果无线程睡眠,这条语句就无效*/
        pthread_cond_signal(&(pool->queue_ready));
    
        return;
    }
    void pthread_destroy(void)
    {
        if(pool->shutdown)
        {
            return; /*这里防止多次调用*/
        }
        /*这里先将销毁线程池标记置位*/
        pool->shutdown = 1;
    
        /*唤醒所以等待线程,线程池要销毁了*/
        pthread_cond_broadcast(&(pool->queue_ready));
    
        /*阻塞等待线程线程退出,否则就成了僵尸了*/
        int i;
        for (i = 0; i < pool->max_thread_num; i ++)
        {
            pthread_join(pool->phead_threadid[i], NULL);
        }
        /*释放线程号存储占用资源*/
        free(pool->phead_threadid);
        pool->phead_threadid = NULL;
    
        /*释放任务等待队列*/
        CThread_worker *pworker_del = NULL;
        while(pool->phead != NULL)
        {
            pworker_del = pool->phead;
            pool->phead = pool->phead->next;
            free(pworker_del);
            pworker_del = NULL;
        }
    
        /*销毁条件变量和互斥锁,刚开始也忘记了*/
        pthread_mutex_destroy(&(pool->queue_lock));
        pthread_cond_destroy(&(pool->queue_ready));
    
        /*释放线程池*/
        free(pool);
        pool = NULL;
    
        return;
    }
    void *my_process(void * arg)
    {
        printf("thread is 0x%x, working ont task %d
    ", pthread_self(), *(int *)arg);
        sleep(1);
        return;
    }
  • 相关阅读:
    Python爬虫实践 —— 4.好听音乐网轻音乐资源下载
    Python爬虫实践 —— urllib.request和requests
    Python爬虫实践 —— 3.利用爬虫提取返回值,模拟有道词典接口
    Python爬虫实践 —— 2.百度贴吧html文件爬取
    Python爬虫实践 —— 1.对反爬机制的认识
    用gcd库函数求最大公约数
    对结构体进行排序
    python菜鸟入门知识
    python安装及typora使用
    python基础一整型、bool、字符串
  • 原文地址:https://www.cnblogs.com/wanghuaijun/p/6043145.html
Copyright © 2011-2022 走看看