zoukankan      html  css  js  c++  java
  • 线程池介绍与示例

    对于线程任务比较轻量,线程请求又比较多的情况,频繁的创建和销毁线程是非常消耗资源且低效的。这时候,就轮到线程池技术大显身手了。

    线程池技术可以提高资源的利用率,即使面对突发性的大量请求,也不会产生大量线程,造成服务器崩溃。

    一般一个简单线程池至少包含下列组成部分:

    1. 线程池管理器(ThreadPoolManager):用于创建并管理线程池。
    2. 工作线程(WorkThread): 线程池中线程。
    3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
    4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

    线程相关接口:

    下面是一个线程池的示例,来自http://hi.baidu.com/boahegcrmdghots/item/f3ca1a3c2d47fcc52e8ec2e1

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    
    typedef struct ThreadWorker {
        void* (*func)(void*);
        void* data;
        struct ThreadWorker* next;
    }ThreadWorker;
    
    typedef struct ThreadPool {
        unsigned int maxThreadCount;       // 最大线程数限制
        ThreadWorker* workerList;          // 线程任务列表
        pthread_t* threadIdList;           // 线程ID列表
        
        pthread_mutex_t threadLock;        // 互斥锁
        pthread_cond_t threadCond;         // 条件锁
        bool shutDown;                     // 是否销毁线程池
    }ThreadPool;
    
    static ThreadPool* lpThreadPool = NULL;
    void* ThreadPoolRoutine(void* data);
    
    void ThreadPoolInit(unsigned int threadCount) {
        lpThreadPool = (ThreadPool*)malloc(sizeof(ThreadPool));
        lpThreadPool->maxThreadCount = threadCount;
        lpThreadPool->workerList = NULL;
        lpThreadPool->threadIdList = (pthread_t*)malloc(sizeof(pthread_t) * threadCount);
        for (unsigned int i = 0; i < threadCount; ++i) {
            pthread_create(&(lpThreadPool->threadIdList[i]), NULL, ThreadPoolRoutine, NULL);
        }
        
        pthread_mutex_init(&(lpThreadPool->threadLock), NULL);
        pthread_cond_init(&(lpThreadPool->threadCond), NULL);
        lpThreadPool->shutDown = false;
    }
    
    // 销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出
    void ThreadPoolDestory() {
        if (lpThreadPool->shutDown) {
            return;
        }
        lpThreadPool->shutDown = true;
        
        // 唤醒所有等待线程,线程池要销毁了
        pthread_cond_broadcast(&(lpThreadPool->threadCond));
        
        for (int i = 0; i < lpThreadPool->maxThreadCount; ++i) {
            pthread_join(lpThreadPool->threadIdList[i], NULL);
        }
        free(lpThreadPool->threadIdList);
        
        ThreadWorker* temp = NULL;
        while (lpThreadPool->workerList) {
            temp = lpThreadPool->workerList;
            lpThreadPool->workerList = lpThreadPool->workerList->next;
            free(temp);
        }
        
        pthread_mutex_destroy(&(lpThreadPool->threadLock));
        pthread_cond_destroy(&(lpThreadPool->threadCond));
        
        free(lpThreadPool);
        lpThreadPool = NULL;
    }
    
    void ThreadPoolAddWork(void* (*func)(void*), void* data) {
        ThreadWorker* worker = (ThreadWorker*)malloc(sizeof(ThreadWorker));
        worker->func = func;
        worker->data = data;
        worker->next = NULL;
        
        pthread_mutex_lock(&(lpThreadPool->threadLock));
        ThreadWorker* lpWorkerList = lpThreadPool->workerList;
        if (lpWorkerList) {
            while (lpWorkerList->next) {
                lpWorkerList = lpWorkerList->next;
            }
            lpWorkerList->next = worker;
        }
        else {
            lpThreadPool->workerList = worker;
        }
        pthread_mutex_unlock(&(lpThreadPool->threadLock));
        
        // 唤醒线程,注意如果所有线程都在忙碌,这句没有任何作用
        pthread_cond_signal(&(lpThreadPool->threadCond));
        printf ("signal thread %p\n", pthread_self ());
    }
    
    void* ThreadPoolRoutine(void* data) {
        printf ("starting thread %p\n", pthread_self ());
        while(true) {
            pthread_mutex_lock(&(lpThreadPool->threadLock));
            // 如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁
            while(lpThreadPool->workerList == NULL && !lpThreadPool->shutDown) {
                printf ("thread %p is waiting\n", pthread_self ());
                pthread_cond_wait(&(lpThreadPool->threadCond), &(lpThreadPool->threadLock));
            }
            if (lpThreadPool->shutDown) {
                pthread_mutex_unlock (&(lpThreadPool->threadLock));
                printf ("thread %p will exit\n", pthread_self ());
                pthread_exit (NULL);
            }
            else {
                ThreadWorker* temp = lpThreadPool->workerList;
                lpThreadPool->workerList = temp->next;
                pthread_mutex_unlock(&(lpThreadPool->threadLock));
                
                (*(temp->func))(temp->data);
                free(temp);
                temp = NULL;
            }
        }
        pthread_exit (NULL);
    }
    
    void* myTestFunc(void* data) {
        printf ("threadID is %p, working on task %d\n", pthread_self (),*(int *)data);
        sleep (1);
        return NULL;
    }
    
    int main() {
        ThreadPoolInit(5);
        const int count = 20;
        int* taskIndex = (int*)malloc(sizeof(int) * count);
        for (int i = 0; i < count; ++i) {
            taskIndex[i] = i;
            ThreadPoolAddWork(myTestFunc, &taskIndex[i]);
        }
        sleep(3);
        while(true) {
            if (!lpThreadPool->workerList) {
                ThreadPoolDestory();
                break;
            }
        }
        free(taskIndex);
        return 0;
    }

    任务接口是为所有任务提供统一的接口,以便工作线程处理。如上例中的myTestFunc.

    pthread_cond_wait:

    pthread_cond_wait所做的第一件事就是同时对互斥对象解锁(于是其它线程可以修改已链接列表),并等待条件 mycond 发生(这样当 pthread_cond_wait接收到另一个线程的“信号”时,它将苏醒)。现在互斥对象已被解锁,其它线程可以访问和修改已链接列表,可能还会添加项。 【要求解锁并阻塞是一个原子操作

    pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) 函数返回时,互斥锁 mutex 将处于锁定状态。因此之后如果需要对临界区数据进行重新访问,则没有必要对 mutex 就行重新加锁。但是,随之而来的问题是,每次条件等待以后需要加入一步手动的解锁操作。

    简单的说,pthread_cond_wait执行时会自动解锁互斥对象,返回时会自动加锁互斥对象。

    pthread_cond_timedwait:

    int pthread_cond_timedwait(pthread_cond_t *restrict cond, 
                  pthread_mutex_t *restrict mutex, 
                  const struct timespec *restrict abstime);

    参数 abstime 在这里用来表示和超时时间相关的一个参数,但是需要注意的是它所表示的是一个绝对时间,而不是一个时间间隔数值,只有当系统的当前时间达到或者超过 abstime 所表示的时间时,才会触发超时事件。

    假设我们指定相对的超时时间参数如 dwMilliseconds (单位毫秒)来调用和超时相关的函数,这样就需要将 dwMilliseconds 转化为 Linux 下的绝对时间参数 abstime 使用。常用的转换方法如下所示:

    /* get the current time */ 
        struct timeval now; 
        gettimeofday(&now, NULL); 
        
        /* add the offset to get timeout value */ 
        abstime ->tv_nsec = now.tv_usec * 1000 + (dwMilliseconds % 1000) * 1000000; 
        abstime ->tv_sec = now.tv_sec + dwMilliseconds / 1000;

    正确销毁线程并释放资源:

    Linux man page :“When a joinable thread terminates, its memory resources (thread descriptor and stack) are not deallocated until another thread performs pthread_join on it. Therefore, pthread_join must be called once for each joinable thread created to avoid memory leaks.”

    Linux 平台默认情况下,虽然各个线程之间是相互独立的,一个线程的终止不会去通知或影响其他的线程。但是已经终止的线程的资源并不会随着线程的终止而得到释放,我们需要调用 pthread_join() 来获得另一个线程的终止状态并且释放该线程所占的资源。

    int pthread_join(pthread_t th, void **thread_return);

    调用该函数的线程将挂起,等待 th 所表示的线程的结束。 thread_return 是指向线程 th 返回值的指针。需要注意的是 th 所表示的线程必须是 joinable 的,即处于非 detached(游离)状态;并且只可以有唯一的一个线程对 th 调用 pthread_join() 。如果 th 处于 detached 状态,那么对 th 的 pthread_join() 调用将返回错误。

    如果你压根儿不关心一个线程的结束状态,那么也可以将一个线程设置为 detached 状态,从而来让操作系统在该线程结束时来回收它所占的资源。将一个线程设置为 detached 状态可以通过两种方式来实现。一种是调用 pthread_detach() 函数,可以将线程 th 设置为 detached 状态。

    另一种方法是在创建线程时就将它设置为 detached 状态,首先初始化一个线程属性变量,然后将其设置为 detached 状态,最后将它作为参数传入线程创建函数 pthread_create(),这样所创建出来的线程就直接处于 detached 状态。

    总之为了在使用 pthread 时避免线程的资源在线程结束时不能得到正确释放,从而避免产生潜在的内存泄漏问题,在对待线程结束时,要确保该线程处于 detached 状态,否着就需要调用 pthread_join() 函数来对其进行资源回收。

    线程池的改进思考:

    1. 适当动态增减线程池中的线程数量。

    2. 是否可以为线程增加优先级的概念。

    参考:

    1. 线程池的介绍及简单实现

    2. Linux 的多线程编程的高效开发经验

    3. 一个Linux下C线程池的实现

    4. linux下多线程的创建与等待详解

    5. pthread_cond_wait()太难理解了

  • 相关阅读:
    android ContentObserver
    3 个简单、优秀的 Linux 网络监视器
    使用 sar 和 kSar 来发现 Linux 性能瓶颈
    4 个拥有绝佳命令行界面的终端程序
    4 个用于构建优秀的命令行用户界面的 Python 库
    理解 Linux 的平均负载和性能监控
    安装matplotlib
    面向系统管理员的网络管理指南
    使用 Nmon 监控 Linux 的系统性能
    linux smem 查看各进程使用memory情况
  • 原文地址:https://www.cnblogs.com/pure/p/2923800.html
Copyright © 2011-2022 走看看