zoukankan      html  css  js  c++  java
  • linux 条件变量与线程池

    条件变量Condition Variables

    概述

    1. 条件变量提供了另外一种线程同步的方式。如果没有条件变量,程序需要使用线程连续轮询(可能在临界区critical section内)方式检查条件是否满足。由于线程连续忙于轮询检查,这会非常消耗资源,而条件变量是一种实现同样目标不需要轮询的方式。

    2. 条件变量总是和互斥锁相结合使用。

    3. 条件变量使用示例结构:

    Main Thread

    • Declare and initialize global data/variables which require synchronization (such as "count")
    • Declare and initialize a condition variable object
    • Declare and initialize an associated mutex
    • Create threads A and B to do work

    Thread A

    • Do work up to the point where a certain condition must occur (such as "count" must reach a specified value)
    • Lock associated mutex and check value of a global variable
    • Call pthread_cond_wait() to perform a blocking wait for signal from Thread-B. Note that a call to pthread_cond_wait() automatically and atomically unlocks the associated mutex variable so that it can be used by Thread-B.
    • When signalled, wake up. Mutex is automatically and atomically locked.
    • Explicitly unlock mutex
    • Continue

    Thread B

    • Do work
    • Lock associated mutex
    • Change the value of the global variable that Thread-A is waiting upon.
    • Check value of the global Thread-A wait variable. If it fulfills the desired condition, signal Thread-A.
    • Unlock mutex.
    • Continue

    Main Thread

    Join / Continue

    创建和销毁条件变量

    pthread_cond_init (condition,attr)

    pthread_cond_destroy (condition)

    pthread_condattr_init (attr)

    pthread_condattr_destroy (attr)

    条件变量必须声明为pthread_cond_t,并且使用之前必须初始化。有两种方式初始化条件变量:
    1)静态初始化:pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;

    2)动态初始化: pthread_cond_init()。条件变量的id号通过条件变量参数返回于调用线程,这种方式允许设置条件变量的属性。然而,只有一种条件变量属性process-shared,这允许其他进程的线程可见该条件变量。如果使用条件变量属性,那么必须是pthread_condattr_t 类型(为了接受默认值可以指定为NULL)。需要注意的是,并非所有实现提供process-shared属性。

    信号等待与信号通知

    pthread_cond_wait (condition,mutex):阻塞调用线程直到特定的条件触发。当互斥量被锁住时该函数应当被调用;当它等待时它将自动释放互斥锁。接收到信号通知和线程被唤醒后,互斥量将自动地被线程锁住。当线程完成任务时,需要手动解锁互斥量。

    pthread_cond_signal (condition)用于唤醒另外一个等待条件变量的线程。互斥量被锁住之后才可调用pthread_cond_signal并且按序解锁用于pthread_cond_wait完成。

    pthread_cond_broadcast (condition)如果多于一个线程处于阻塞等待状态,那么应当使用pthread_cond_broadcast而不是pthread_cond_signal。

    建议使用while循环而不是if,这样可以检查一些潜在的问题,例如:如果若干线程在等待同一个唤醒信号,它们将轮询捕获互斥量,它们中的任何一个可以修改条件;由于程序bug,线程接收到错误信号;线程库允许不违反标准的前提下虚假的唤醒一个等待线程。

    使用这些函数时,必须正确地加锁解锁互斥变量。

    调用pthread_cond_wait前锁定互斥量失败可能导致线程阻塞失败;

    调用pthread_cond_signal后解锁互斥量失败可能不允许匹配的pthread_cond_wait完成(即阻塞掉)。

    实际上pthread_cond_wait的返回不仅仅是pthread_cond_signal和pthread_cond_broadcast导致的,还会有一些假唤醒,也就是spurious wakeup。

    pthread_cond_wait的通常使用方法:

    pthread_mutex_lock();

    while(condition_is_false)

        pthread_cond_wait();

    pthread_mutex_unlock();

    为什么在pthread_cond_wait()前要加一个while循环来判断条件是否为假呢?

    APUE中写道:

    传递给pthread_cond_wait的互斥量对条件进行保护,调用者把锁住的互斥量传给函数。函数把调用线程放到等待条件的线程列表上,然后对互斥量解锁,这两个操作是原子操作。

    线程释放互斥量,等待其他线程发给该条件变量的信号(唤醒一个等待者)或广播该条件变量(唤醒所有等待者)。当等待条件变量时,互斥量必须始终为释放的,这样其他线程才有机会锁住互斥量,修改条件变量。当线程从条件变量等待中醒来时,它重新继续锁住互斥量,对临界资源进行处理。

    条件变量的作用是发信号,而不是互斥。

    wait前检查

    对于多线程程序,不能够用常规串行的思路来思考它们,因为它们是完全异步的,会出现很多临界情况。比如:pthread_cond_signal的时间早于pthread_cond_wait的时间,这样pthread_cond_wait就会一直等下去,漏掉了之前的条件变化。

    对于这种情况,解决的方法是在锁住互斥量之后和等待条件变量之前,检查条件变量是否已经发生变化。

    if(condition_is_false)

        pthread_cond_wait();

    这样在等待条件变量前检查一下条件变量的值,如果条件变量已经发生了变化,那么就没有必要进行等待了,可以直接进行处理。这种方法在并发系统中比较常见

    1.等待函数里面要传入一个互斥量,这个互斥量会在这个函数调用时会发生如下变化:函数刚刚被调用时,会把这个互斥量解锁,然后让调用线程阻塞,解锁后其他线程才有机会获得这个锁。当某个线程调用通知函数时,这个函数收到通知后,又把互斥量加锁,然后继续向下操作临界区。可见这个设计是非常合理的!

    2.条件变量的等待函数用while循环包围。原因:如果有多个线程都在等待这个条件变量关联的互斥量,当条件变量收到通知,它下一步就是要锁住这个互斥量,但在这个极小的时间差里面,其他线程抢先获取了这互斥量并进入临界区把某个状态改变了。此时这个条件变量应该继续判断别人刚刚抢先修改的状态,即继续执行while的判断。还有一个原因时防止虚假通知,收到虚假通知后,只要while里面的条件为真,就继续休眠.

    参考资料:https://computing.llnl.gov/tutorials/pthreads/#ConVarSignal

    http://www.cnblogs.com/leaven/archive/2010/06/03/1750973.html

    https://www.cnblogs.com/yuuyuu/p/5140875.html

    linux下C 线程池的原理讲解和代码实现(能自行伸缩扩展线程数)

    Linux C++线程池框架

    Linux的多任务编程-线程池

     1 #include <pthread.h>
     2 #include <stdio.h>
     3 #include <stdlib.h>
     4 #include <unistd.h>
     5 #define NUM_THREADS  3
     6 #define TCOUNT 10
     7 #define COUNT_LIMIT 12
     8 
     9 int     count = 0;
    10 int     thread_ids[3] = {0,1,2};
    11 pthread_mutex_t count_mutex;
    12 pthread_cond_t count_threshold_cv;
    13 
    14 void *inc_count(void *t)
    15 {
    16     int i;
    17     long my_id = (long)t;
    18 
    19     for (i=0; i<TCOUNT; i++) {
    20         pthread_mutex_lock(&count_mutex);
    21         count++;
    22 
    23         /*
    24      Check the value of count and signal waiting thread when condition is
    25      reached.  Note that this occurs while mutex is locked.
    26      */
    27         if (count == COUNT_LIMIT) {
    28             pthread_cond_signal(&count_threshold_cv);
    29             printf("inc_count(): thread %ld, count = %d  Threshold reached.
    ",
    30                    my_id, count);
    31         }
    32         printf("inc_count(): thread %ld, count = %d, unlocking mutex
    ",
    33                my_id, count);
    34         pthread_mutex_unlock(&count_mutex);
    35 
    36         /* Do some "work" so threads can alternate on mutex lock */
    37         sleep(1);
    38     }
    39     pthread_exit(NULL);
    40 }
    41 
    42 void *watch_count(void *t)
    43 {
    44     long my_id = (long)t;
    45 
    46     printf("Starting watch_count(): thread %ld
    ", my_id);
    47 
    48     /*
    49    Lock mutex and wait for signal.  Note that the pthread_cond_wait
    50    routine will automatically and atomically unlock mutex while it waits.
    51    Also, note that if COUNT_LIMIT is reached before this routine is run by
    52    the waiting thread, the loop will be skipped to prevent pthread_cond_wait
    53    from never returning.
    54    */
    55     pthread_mutex_lock(&count_mutex);
    56     while (count<COUNT_LIMIT) {
    57         pthread_cond_wait(&count_threshold_cv, &count_mutex);
    58         printf("watch_count(): thread %ld Condition signal received.
    ", my_id);
    59     }
    60     count += 125;
    61     printf("watch_count(): thread %ld count now = %d.
    ", my_id, count);
    62     pthread_mutex_unlock(&count_mutex);
    63     pthread_exit(NULL);
    64 }
    65 
    66 int main (int argc, char *argv[])
    67 {
    68     int i, rc;
    69     long t1=1, t2=2, t3=3;
    70     pthread_t threads[3];
    71     pthread_attr_t attr;
    72 
    73     /* Initialize mutex and condition variable objects */
    74     pthread_mutex_init(&count_mutex, NULL);
    75     pthread_cond_init (&count_threshold_cv, NULL);
    76 
    77     /* For portability, explicitly create threads in a joinable state */
    78     pthread_attr_init(&attr);
    79     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    80 
    81     pthread_create(&threads[0], &attr, watch_count, (void *)t1);
    82     pthread_create(&threads[1], &attr, inc_count, (void *)t2);
    83     pthread_create(&threads[2], &attr, inc_count, (void *)t3);
    84 
    85     /* Wait for all threads to complete */
    86     for (i=0; i<NUM_THREADS; i++) {
    87         pthread_join(threads[i], NULL);
    88     }
    89     printf ("Main(): Waited on %d  threads. Done.
    ", NUM_THREADS);
    90 
    91     /* Clean up and exit */
    92     pthread_attr_destroy(&attr);
    93     pthread_mutex_destroy(&count_mutex);
    94     pthread_cond_destroy(&count_threshold_cv);
    95     pthread_exit(NULL);
    96 
    97 }
    View Code

    翻译资料:https://computing.llnl.gov/tutorials/pthreads/#ConVarSignal

    线程池

    上面之所以会谈到条件变量,有两个原因,其一线程池的实现需要条件变量方面的知识,其二因为它的实现牵涉到一些细节,理解条件变量有一定的困难,如果不理解它与互斥锁结合使用的实现原理,也就无法正确使用条件变量。

    #ifndef THREADPOOL_H
    #define THREADPOOL_H
    /*
     *线程池包括:n个执行任务的线程,一个任务队列,一个管理线程
    1、预先启动一些线程,线程负责执行任务队列中的任务,当队列空时,线程挂起。
    2、调用的时候,直接往任务队列添加任务,并发信号通知线程队列非空。
    3、管理线程负责监控任务队列和系统中的线程状态,当任务队列为空,线程数目多且很多处于空闲的时候,便通知一些线程退出以节约系统资源;当任务队列排队任务多且线程都在忙,便负责再多启动一些线程来执行任务,以确保任务执行效率。
     *
     */
    #include <pthread.h>
    
    typedef struct threadpool_task_t
    {
        void *(*function)(void *);
        void *arg;
    } threadpool_task_t;
    
    typedef struct threadpool_t
    {
        pthread_mutex_t lock;// mutex for the taskpool
        pthread_mutex_t thread_counter;//mutex for count the busy thread
        pthread_cond_t queue_not_full;
        pthread_cond_t queue_not_empty;//任务队列非空的信号
        pthread_t *threads;//执行任务的线程
        pthread_t adjust_tid;//负责管理线程数目的线程
        threadpool_task_t *task_queue;//任务队列
        int min_thr_num;
        int max_thr_num;
        int live_thr_num;
        int busy_thr_num;
        int wait_exit_thr_num;
        int queue_front;
        int queue_rear;
        int queue_size;
        int queue_max_size;
        bool shutdown;
    }threadpool_t;
    
    threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
    
    int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
    
    /**
     * @function void *threadpool_thread(void *threadpool)
     * @desc the worker thread
     * @param threadpool the pool which own the thread
     */
    
    void *threadpool_thread(void *threadpool);
    /**
     * @function void *adjust_thread(void *threadpool);
     * @desc manager thread
     * @param threadpool the threadpool
     */
    
    void *adjust_thread(void *threadpool);
    /**
     * check a thread is alive
     */
    
    bool is_thread_alive(pthread_t tid);
    
    int threadpool_destroy(threadpool_t *pool);
    
    int threadpool_free(threadpool_t *pool);
    
    int threadpool_all_threadnum(threadpool_t *pool);
    
    int threadpool_busy_threadnum(threadpool_t *pool);
    
    #endif // THREADPOOL_H
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <assert.h>
    #include <stdio.h>
    #include <string.h>
    #include <signal.h>
    #include <errno.h>
    #include <stdbool.h>
    #include "threadpool.h"
    #define DEFAULT_TIME 10 // 领导定时检查队列、线程状态的时间间隔
    #define MIN_WAIT_TASK_NUM 10 // 队列中等待的任务数>这个值,便会增加线程
    #define DEFAULT_THREAD_VARY 10 //每次线程加减的数目
    
    
    //创建线程池
    threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
    {
        threadpool_t *pool = NULL;
        do{
            if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
            {
                printf("malloc threadpool fail");
                break;
            }
            pool->min_thr_num = min_thr_num;
            pool->max_thr_num = max_thr_num;
            pool->busy_thr_num = 0;
            pool->live_thr_num = min_thr_num;
            pool->queue_size = 0;
            pool->queue_max_size = queue_max_size;
            pool->queue_front = 0;
            pool->queue_rear = 0;
            pool->shutdown = false;
            pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
            if (pool->threads == NULL)
            {
                printf("malloc threads fail");
                break;
            }
            memset(pool->threads, 0, sizeof(pool->threads));
            pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
            if (pool->task_queue == NULL)
            {
                printf("malloc task_queue fail");
                break;
            }
            if (pthread_mutex_init(&(pool->lock), NULL) != 0
                    || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
                    || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
                    || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
            {
                printf("init the lock or cond fail");
                break;
            }
            /**
     * start work thread min_thr_num
     */
            for (int i = 0; i < min_thr_num; i++)
            {
                //启动任务线程
                pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
                printf("start thread 0x%x...
    ", pool->threads[i]);
            }
            //启动管理线程
            pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);
            return pool;
        }while(0);
        threadpool_free(pool);
        return NULL;
    }
    
    //把任务添加到队列中
    int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
    {
        assert(pool != NULL);
        assert(function != NULL);
        assert(arg != NULL);
        pthread_mutex_lock(&(pool->lock));
        //队列满的时候,等待
        while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
        {
            //queue full wait
            pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
        }
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&(pool->lock));
        }
        //如下是添加任务到队列,使用循环队列
        if (pool->task_queue[pool->queue_rear].arg != NULL)
        {
            free(pool->task_queue[pool->queue_rear].arg);
            pool->task_queue[pool->queue_rear].arg = NULL;
        }
        pool->task_queue[pool->queue_rear].function = function;
        pool->task_queue[pool->queue_rear].arg = arg;
        pool->queue_rear = (pool->queue_rear + 1)%pool->queue_max_size;
        pool->queue_size++;
        //每次加完任务,发个信号给线程
        //若没有线程处于等待状态,此句则无效,但不影响
        pthread_cond_signal(&(pool->queue_not_empty));
        pthread_mutex_unlock(&(pool->lock));
        return 0;
    }
    
    //线程执行任务
    void *threadpool_thread(void *threadpool)
    {
        threadpool_t *pool = (threadpool_t *)threadpool;
        threadpool_task_t task;
        while(true)
        {
            /* Lock must be taken to wait on conditional variable */
            pthread_mutex_lock(&(pool->lock));
            //任务队列为空的时候,等待
            while ((pool->queue_size == 0) && (!pool->shutdown))
            {
                printf("thread 0x%x is waiting
    ", pthread_self());
                pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
                //被唤醒后,判断是否是要退出的线程
                if (pool->wait_exit_thr_num > 0)
                {
                    pool->wait_exit_thr_num--;
                    if (pool->live_thr_num > pool->min_thr_num)
                    {
                        printf("thread 0x%x is exiting
    ", pthread_self());
                        pool->live_thr_num--;
                        pthread_mutex_unlock(&(pool->lock));
                        pthread_exit(NULL);
                    }
                }
            }
            if (pool->shutdown)
            {
                pthread_mutex_unlock(&(pool->lock));
                printf("thread 0x%x is exiting
    ", pthread_self());
                pthread_exit(NULL);
            }
            //get a task from queue
            task.function = pool->task_queue[pool->queue_front].function;
            task.arg = pool->task_queue[pool->queue_front].arg;
            pool->queue_front = (pool->queue_front + 1)%pool->queue_max_size;
            pool->queue_size--;
            //now queue must be not full
            pthread_cond_broadcast(&(pool->queue_not_full));
            pthread_mutex_unlock(&(pool->lock));
            // Get to work
            printf("thread 0x%x start working
    ", pthread_self());
            pthread_mutex_lock(&(pool->thread_counter));
            pool->busy_thr_num++;
            pthread_mutex_unlock(&(pool->thread_counter));
            (*(task.function))(task.arg);
            // task run over
            printf("thread 0x%x end working
    ", pthread_self());
            pthread_mutex_lock(&(pool->thread_counter));
            pool->busy_thr_num--;
            pthread_mutex_unlock(&(pool->thread_counter));
        }
        pthread_exit(NULL);
        return (NULL);
    }
    
    //管理线程
    void *adjust_thread(void *threadpool)
    {
        threadpool_t *pool = (threadpool_t *)threadpool;
        while (!pool->shutdown)
        {
            sleep(DEFAULT_TIME);
            pthread_mutex_lock(&(pool->lock));
            int queue_size = pool->queue_size;
            int live_thr_num = pool->live_thr_num;
            pthread_mutex_unlock(&(pool->lock));
            pthread_mutex_lock(&(pool->thread_counter));
            int busy_thr_num = pool->busy_thr_num;
            pthread_mutex_unlock(&(pool->thread_counter));
            //任务多线程少,增加线程
            if (queue_size >= MIN_WAIT_TASK_NUM
                    && live_thr_num < pool->max_thr_num)
            {
                //need add thread
                pthread_mutex_lock(&(pool->lock));
                int add = 0;
                for (int i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
                     && pool->live_thr_num < pool->max_thr_num; i++)
                {
                    if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
                    {
                        pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
                        add++;
                        pool->live_thr_num++;
                    }
                }
                pthread_mutex_unlock(&(pool->lock));
            }
            //任务少线程多,减少线程
            if ((busy_thr_num * 2) < live_thr_num
                    && live_thr_num > pool->min_thr_num)
            {
                //need del thread
                pthread_mutex_lock(&(pool->lock));
                pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;
                pthread_mutex_unlock(&(pool->lock));
                //wake up thread to exit
                for (int i = 0; i < DEFAULT_THREAD_VARY; i++)
                {
                    pthread_cond_signal(&(pool->queue_not_empty));
                }
            }
        }
        return NULL;
    }
    
    int threadpool_destroy(threadpool_t *pool)
    {
        if (pool == NULL)
        {
            return -1;
        }
        pool->shutdown = true;
        //adjust_tid exit first
        pthread_join(pool->adjust_tid, NULL);
        // wake up the waiting thread
        pthread_cond_broadcast(&(pool->queue_not_empty));
        for (int i = 0; i < pool->min_thr_num; i++)
        {
            pthread_join(pool->threads[i], NULL);
        }
        threadpool_free(pool);
        return 0;
    }
    
    int threadpool_free(threadpool_t *pool)
    {
        if (pool == NULL)
        {
            return -1;
        }
        if (pool->task_queue)
        {
            free(pool->task_queue);
        }
        if (pool->threads)
        {
            free(pool->threads);
            pthread_mutex_lock(&(pool->lock));
            pthread_mutex_destroy(&(pool->lock));
            pthread_mutex_lock(&(pool->thread_counter));
            pthread_mutex_destroy(&(pool->thread_counter));
            pthread_cond_destroy(&(pool->queue_not_empty));
            pthread_cond_destroy(&(pool->queue_not_full));
        }
        free(pool);
        pool = NULL;
        return 0;
    }
    
    int threadpool_all_threadnum(threadpool_t *pool)
    {
        int all_threadnum = -1;
        pthread_mutex_lock(&(pool->lock));
        all_threadnum = pool->live_thr_num;
        pthread_mutex_unlock(&(pool->lock));
        return all_threadnum;
    }
    
    int threadpool_busy_threadnum(threadpool_t *pool)
    {
        int busy_threadnum = -1;
        pthread_mutex_lock(&(pool->thread_counter));
        busy_threadnum = pool->busy_thr_num;
        pthread_mutex_unlock(&(pool->thread_counter));
        return busy_threadnum;
    }
    
    bool is_thread_alive(pthread_t tid)
    {
        int kill_rc = pthread_kill(tid, 0);
        if (kill_rc == ESRCH)
        {
            return false;
        }
        return true;
    }
    
    //for test
    void *process(void *arg)
    {
        printf("thread 0x%x working on task %d
     ",pthread_self(),*(int *)arg);
        sleep(1);
        printf("task %d is end
    ",*(int *)arg);
        return NULL;
    }
    
    int main()
    {
        threadpool_t *thp = threadpool_create(3,100,12);
        printf("pool inited");
    
        int *num = (int *)malloc(sizeof(int)*20);
        for (int i=0;i<10;i++)
        {
            num[i]=i;
            printf("add task %d
    ",i);
            threadpool_add(thp,process,(void*)&num[i]);
        }
        sleep(10);
        threadpool_destroy(thp);
        return 0;
    }
  • 相关阅读:
    C# 日期帮助类【原创】
    C# 发送邮件
    每日一题力扣453
    每日力扣628
    每日一题力扣41巨坑
    每日一题力扣274
    每日一题力扣442有坑
    每日一题力扣495
    每日一题力扣645
    每日一题力扣697
  • 原文地址:https://www.cnblogs.com/guxuanqing/p/8244938.html
Copyright © 2011-2022 走看看