zoukankan      html  css  js  c++  java
  • 使用互斥量和条件变量实现线程同步控制

    管程(monitor)说明

    在并发编程中,管程(monitor)是一个同步构件,管程实现了同一时间点,最多只有一个线程可以执行管程的某个子程序。与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程的实现很大程度上简化了程序设计。

    管程可以确保一次只有一个进程执行管程中的程序,因此程序员不需要显式地编写同步代码,但是如果需要就某些特定条件上的同步,则需要定义一些条件结构(condition variable)来实现,并且对条件变量的操作仅有wait()和signal(),如下:

    condition x, y;

    x.wait();

    ...

    x.signal();

    调用x.wait()操作可能会使得一个进程挂起,直到另一个进程调用x.signal()操作。与信号量中的signal()操作相比,管程中如果在没有任何进程挂起的情况下调用signal()没有任何作用,而在信号量中,则必然会改变信号量的状态。

    一个管程(mointor)的示意图如下所示:

     

    一个mointor中的程序运行前必须首先获取mutex,直至程序运行完成或者线程等待的某个条件发生时才释放mutex。当一个线程执行mointor中的一个子程序时,称为占用(occupy)该mointor,因此必须等到没有其他线程执行管程程序时方可调用管程程序,这是互斥保证。在管程的简单实现中,编译器为每个管程对象自动加入一把私有的mutex(lock),初始状态为unlock,管程中的每个对象入口处执行lock操作,出口处执行unlock操作。

    因此设计monitor时至少必须包含mutex(lock) object(互斥量)和condition variables(条件变量)。一个条件变量可以看作是等待该条件发生的线程集合。

    注:monitor也称为<线程安全对象/类/模块>。

     

    条件变量

    为何需要条件变量?

    考虑如下一个busy waiting loop:

    while not(P)

        do skip

    如果仅有mutex,则线程必须等待P为真时才能继续执行。如此,将会导致其他线程无法进入临界区使得条件P为真,因此该管程可能发生死锁。

    可以用条件变量解决。一个条件变量C可以看作是一个线程队列,其中存放的线程正在等待与之关联的条件变为真。当一个线程等待一个条件变量C时,其将mutex释放,然后其他线程就可以进入该管程中,通过改变C的值可以使得条件C满足,因此对条件变量C可以有如下操作:

    (1)wait(c, m):线程调用该操作,等待条件C满足后继续执行,在等待过程中,释放mutex,因此此过程中,该线程不占用管程。

    (2)signal(c):线程调用该操作表明此时条件C为真。

    一个线程发生signal()后,至少有两个线程想要占用包含条件变量的管程:发出signal()操作的线程P,等待条件变量的线程Q,此时有两种选择:

    1.非阻塞式条件变量:Q继续等待直到P完成。

    2.阻塞式条件变量:P继续等待直到Q完成。

    两种条件变量类型

    阻塞式条件变量

    也被称为霍尔风格(Hoare-style)管程,如下图所示:

     

    每个管程包含两个线程队列e,s,其中:

    e:入口队列

    s:发出signal的线程队列

    对于每个条件变量C,有一个线程队列,用C.q表示,如上图的a.q、b.q,这些队列很多情况下可以实现为FIFO模式。

    阻塞式条件变量实现如下:

     

    非阻塞式条件变量

    也称为Mesa风格管程,如下图所示:

     

    该模型中,发出signal()操作的线程不会失去管程的占用权,被notified()的线程将会被移到队列e中,相较于阻塞式条件变量,该模型不需要队列s。例如Pthread中的条件变量就采用这种非阻塞模式,即发出signal()操作的线程优先级高于被notified()的线程,要使用这种条件变量:首先利用pthread_mutex_lock获取互斥锁,然后调用pthread_cond_wait在线程睡眠等待之前先释放互斥锁,在其被唤醒后再重新获取互斥锁。关于pthread条件变量如下会有详细介绍。

    非阻塞条件变量实现如下:

     

    POSIX同步之互斥锁和条件变量的使用

    如下为经典的有界缓冲区问题,可以用生产者/消费者模型描述,示意图如下:

     

    采用互斥量的生产者/消费者代码如下:

      1 [root@bogon unp]# cat producer_consumer_mutex.c
      2 #include <unistd.h>
      3 #include <sys/types.h>
      4 #include <pthread.h>
      5 #include <stdlib.h>
      6 #include <string.h>
      7 #include <errno.h>
      8 #include <stdio.h>
      9 
     10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
     11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
     12 #define BUFFERSIZE 10
     13 
     14 int g_buffer[BUFFERSIZE];
     15 
     16 unsigned short in = 0;
     17 unsigned short out = 0;
     18 
     19 pthread_mutex_t g_mutex;
     20 
     21 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
     22 
     23 void* consumer(void* arg)
     24 {
     25         int num = (int)arg;
     26         /* 不断消费 */
     27         while (1)
     28         {
     29                 pthread_mutex_lock(&g_mutex);
     30 
     31                 /* 打印仓库当前状态 */
     32                 int i;
     33                 for (i = 0; i < BUFFERSIZE; i++) 
     34                 {
     35                         if (g_buffer[i] == -1)
     36                                 printf("g_buffer[%d] = %s
    ", i, "null");
     37                         else
     38                                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     39 
     40                         if (i == out)
     41                                 printf("g_buffer[%d]可以消费
    ", i);
     42                 }
     43 
     44                 /* 消费产品 */
     45                 printf("thread %d 开始消费产品 %d
    ", num, g_buffer[out]);
     46 sleep(4);       /* 消费一个产品需要4秒 */
     47                 g_buffer[out] = -1;
     48                 printf("消费完毕
    ");
     49                 out = (out + 1) % BUFFERSIZE;
     50 
     51                 pthread_mutex_unlock(&g_mutex);
     52         }
     53 
     54         return NULL;
     55 }
     56 
     57 void* producer(void* arg)
     58 {
     59         int num = (int)arg;
     60         /* 不断生产 */
     61         while (1)
     62         {
     63                 pthread_mutex_lock(&g_mutex);
     64 
     65                 /* 打印仓库当前状态 */
     66                 int i;
     67                 for (i = 0; i < BUFFERSIZE; i++)
     68         {
     69                 if (g_buffer[i] == -1)
     70                 printf("g_buffer[%d] = %s
    ", i, "null");
     71             else
     72                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     73   
     74             if (i == in)
     75                 printf("g_buffer[%d]可以生产
    ", i);
     76         }
     77 
     78                 /* 生产产品 */
     79                 g_buffer[in]++;
     80                 printf("thread %d 开始生产产品 %d
    ", num, g_buffer[in]);
     81                 sleep(2);       /* 生产一个产品需要2秒 */
     82                 printf("生产完毕
    ");
     83                 in = (in + 1) % BUFFERSIZE;
     84 
     85                 pthread_mutex_unlock(&g_mutex);
     86         }
     87 
     88         return NULL;
     89 }
     90 
     91 int main(void)
     92 {
     93         /* 初始化仓库 */
     94         int i;
     95         for (i = 0; i < BUFFERSIZE; i++)
     96                 g_buffer[i] = -1;
     97 
     98         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
     99         for (i = 0; i < CONSUMER_COUNT; i++)
    100         {
    101                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
    102         }
    103 
    104         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
    105         for (i = 0; i < PRODUCER_COUNT; i++)
    106         {
    107                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
    108         }
    109 
    110         /* 等待创建的所有线程退出 */
    111         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
    112         {
    113                 pthread_join(g_thread[i], NULL);
    114         }
    115 
    116         return 0;
    117 }
    118 
    119 // output
    120 ...
    121 thread 2 开始生产产品 4
    122 生产完毕
    123 g_buffer[0] = 4
    124 g_buffer[1] = 4
    125 g_buffer[2] = 4
    126 g_buffer[3] = 2
    127 g_buffer[3]可以生产
    128 g_buffer[4] = 2
    129 g_buffer[5] = 1
    130 g_buffer[6] = 1
    131 g_buffer[7] = 0
    132 g_buffer[8] = 0
    133 g_buffer[9] = 4
    134 thread 1 开始生产产品 3
    135 生产完毕
    136 g_buffer[0] = 4
    137 g_buffer[1] = 4
    138 g_buffer[2] = 4
    139 g_buffer[3] = 3
    140 g_buffer[4] = 2
    141 g_buffer[5] = 1
    142 g_buffer[6] = 1
    143 g_buffer[7] = 0
    144 g_buffer[8] = 0
    145 g_buffer[9] = 4
    146 g_buffer[9]可以消费
    147 thread 0 开始消费产品 4
    148 消费完毕
    149 ...
    View Code

    但是上述程序中存在一个问题,就是当生产者线程未准备好产品时,消费者线程却在不断执行循环,这种被称为轮转(spinning)或者轮询(polling)的现象是对CPU资源的一大浪费。如下引入条件变量与互斥锁共同工作,互斥锁用于加锁互斥,而条件变量则专注于等待,每个条件变量总是和一个互斥锁关联。

    采用条件变量的生产者/消费者代码如下:

      1 [root@bogon unp]# cat producer_consumer_condition.c
      2 #include <unistd.h>
      3 #include <sys/types.h>
      4 #include <pthread.h>
      5 #include <stdlib.h>
      6 #include <string.h>
      7 #include <errno.h>
      8 #include <stdio.h>
      9 
     10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
     11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
     12 #define BUFFERSIZE 10
     13 
     14 int g_buffer[BUFFERSIZE];
     15 
     16 unsigned short in = 0;
     17 unsigned short out = 0;
     18 
     19 pthread_mutex_t g_mutex;
     20 
     21 typedef struct
     22 {
     23         pthread_mutex_t mutex;
     24         pthread_cond_t cond;
     25 } Condition;
     26 
     27 Condition not_empty = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
     28 Condition not_full = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
     29 
     30 int nready;             /* 可以消费的产品数量 */
     31 
     32 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
     33 
     34 void* consumer(void* arg)
     35 {
     36         int num = (int)arg;
     37         /* 不断消费 */
     38         while (1)
     39         {
     40                 pthread_mutex_lock(&g_mutex);
     41 
     42                 /* 打印仓库当前状态,(为了便于比较,这段打印临界区依然只使用互斥锁保护) */
     43                 int i;
     44         for (i = 0; i < BUFFERSIZE; i++) 
     45         {
     46                 if (g_buffer[i] == -1)
     47                 printf("g_buffer[%d] = %s
    ", i, "null");
     48             else
     49                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     50 
     51             if (i == out)
     52                 printf("g_buffer[%d]可以消费
    ", i);
     53         }
     54 
     55                 pthread_mutex_unlock(&g_mutex);
     56 
     57                 /* 消费产品 */
     58                 pthread_mutex_lock(&not_empty.mutex);
     59 
     60                 while (nready == 0)
     61                         pthread_cond_wait(&not_empty.cond, &not_empty.mutex);
     62                 printf("thread %d 开始消费产品 %d
    ", num, g_buffer[out]);
     63                 sleep(4);       /* 消费一个产品需要4秒 */
     64                 g_buffer[out] = -1;
     65                 printf("消费完毕
    ");
     66                 --nready;
     67                 out = (out + 1) % BUFFERSIZE;
     68 
     69                 pthread_cond_signal(&not_full.cond);
     70                pthread_mutex_unlock(&not_empty.mutex);
     71         }
     72 
     73         return NULL;
     74 }
     75 
     76 void* producer(void* arg)
     77 {
     78         int num = (int)arg;
     79         /* 不断生产 */
     80         while (1)
     81         {
     82                 pthread_mutex_lock(&g_mutex);
     83 
     84                 /* 打印仓库当前状态 */
     85                 int i;
     86                 for (i = 0; i < BUFFERSIZE; i++)
     87         {
     88                 if (g_buffer[i] == -1)
     89                 printf("g_buffer[%d] = %s
    ", i, "null");
     90             else
     91                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     92   
     93             if (i == in)
     94                 printf("g_buffer[%d]可以生产
    ", i);
     95         }
     96 
     97                 pthread_mutex_unlock(&g_mutex);
     98 
     99                 /* 生产产品 */
    100                 pthread_mutex_lock(&not_full.mutex);
    101 
    102                 while (nready == BUFFERSIZE)
    103                         pthread_cond_wait(&not_full.cond, &not_full.mutex);
    104                 g_buffer[in]++;
    105                 printf("thread %d 开始生产产品 %d
    ", num, g_buffer[in]);
    106                 sleep(2);       /* 生产一个产品需要2秒 */
    107                 printf("生产完毕
    ");
    108                 ++nready;
    109                 in = (in + 1) % BUFFERSIZE;
    110 
    111                 pthread_cond_signal(&not_empty.cond);
    112                 pthread_mutex_unlock(&not_full.mutex);
    113         }
    114 
    115         return NULL;
    116 }
    117 
    118 int main(void)
    119 {
    120         /* 初始化仓库 */
    121         int i;
    122         for (i = 0; i < BUFFERSIZE; i++)
    123                 g_buffer[i] = -1;
    124 
    125         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
    126         for (i = 0; i < CONSUMER_COUNT; i++)
    127         {
    128                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
    129         }
    130 
    131         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
    132         for (i = 0; i < PRODUCER_COUNT; i++)
    133         {
    134                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
    135         }
    136 
    137         /* 等待创建的所有线程退出 */
    138         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
    139         {
    140                 pthread_join(g_thread[i], NULL);
    141         }
    142 
    143         return 0;
    144 }
    145 
    146 // output is the same as above
    View Code

    条件变量使用说明:

    一个条件变量的改变是原子性的,因此需要一个互斥锁来保证,因此,条件变量的使用代码可以如下:

    1 typedef struct
    2 {
    3     pthread_mutex_t mutex;
    4     pthread_cond_t cond;
    5     // 与条件变量相关的变量声明
    6 } Condition;
    7 Condition cond_a = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
    8 Condition cond_b = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
    9 ...

    1.执行signal操作的线程中流程如下:

    pthread_mutex_lock(&cond_a.mutex);
    // 设置条件为真
    pthread_cond_signal(&cond_a.cond);
    pthread_mutex_unlock(&cond_a.mutex);

    说明:

    pthread_cond_signal与pthread_mutex_unlock的顺序:如果先signal后unlock,则可以确定signal操作是由lock住cond_a.mutex的线程调用的;如果先unlock后signal,则任一线程都可调用signal操作。如果需要可预见的调度行为,最好先signal后unlock,就像上面那样。

    2.执行wait操作的线程中流程如下:

    pthread_mutex_lock(&cond_a.mutex);
    while (条件为假)
        pthread_cond_wait(&cond_a.cond, &cond_a.mutex);
    // 修改条件
    pthread_mutex_unlock(&cond_a.mutex);

    说明:

    (1)pthread_cond_wait执行如下3个操作:

    • 解锁cond_a.mutex,使得其他线程可以进入以便改变条件
    • 将调用线程阻塞在条件变量cond_a上(睡眠了),直到某个线程将条件设为真
    • 成功返回后(此时某个线程调用了pthread_cond_signal/broadcast)重新对cond_a.mutex加锁。

    (2)是否可以将:

    while (条件为假)
        pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

    替换为:

    if (条件为假)
        pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

    答案是如果将while替换为if,可以发生虚假(spurious)唤醒:即发出signal的线程并为将条件设为真就调用了pthread_cond_signal,此时pthread_cond_wait却成功返回了,如此将导致后续的代码执行失败。因此必须在pthread_cond_wait返回后再次判断条件是否确实为真,即必须使用循环而非条件判断。

     

     

     

  • 相关阅读:
    Vasya and Endless Credits CodeForces
    Dreamoon and Strings CodeForces
    Online Meeting CodeForces
    数塔取数 基础dp
    1001 数组中和等于K的数对 1090 3个数和为0
    1091 线段的重叠
    51nod 最小周长
    走格子 51nod
    1289 大鱼吃小鱼
    POJ 1979 Red and Black
  • 原文地址:https://www.cnblogs.com/benxintuzi/p/4874516.html
Copyright © 2011-2022 走看看