zoukankan      html  css  js  c++  java
  • 使用互斥量和条件变量实现线程同步控制

    管程(monitor)说明

    在并发编程中,管程(monitor)是一个同步构件,管程实现了同一时间点,最多只有一个线程可以执行管程的某个子程序。与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程的实现很大程度上简化了程序设计。

    管程可以确保一次只有一个进程执行管程中的程序,因此程序员不需要显式地编写同步代码,但是如果需要就某些特定条件上的同步,则需要定义一些条件结构(condition variable)来实现,并且对条件变量的操作仅有wait()和signal(),如下:

    condition x, y;

    x.wait();

    ...

    x.signal();

    调用x.wait()操作可能会使得一个进程挂起,直到另一个进程调用x.signal()操作。与信号量中的signal()操作相比,管程中如果在没有任何进程挂起的情况下调用signal()没有任何作用,而在信号量中,则必然会改变信号量的状态。

    一个管程(mointor)的示意图如下所示:

     

    一个mointor中的程序运行前必须首先获取mutex,直至程序运行完成或者线程等待的某个条件发生时才释放mutex。当一个线程执行mointor中的一个子程序时,称为占用(occupy)该mointor,因此必须等到没有其他线程执行管程程序时方可调用管程程序,这是互斥保证。在管程的简单实现中,编译器为每个管程对象自动加入一把私有的mutex(lock),初始状态为unlock,管程中的每个对象入口处执行lock操作,出口处执行unlock操作。

    因此设计monitor时至少必须包含mutex(lock) object(互斥量)和condition variables(条件变量)。一个条件变量可以看作是等待该条件发生的线程集合。

    注:monitor也称为<线程安全对象/类/模块>。

     

    条件变量

    为何需要条件变量?

    考虑如下一个busy waiting loop:

    while not(P)

        do skip

    如果仅有mutex,则线程必须等待P为真时才能继续执行。如此,将会导致其他线程无法进入临界区使得条件P为真,因此该管程可能发生死锁。

    可以用条件变量解决。一个条件变量C可以看作是一个线程队列,其中存放的线程正在等待与之关联的条件变为真。当一个线程等待一个条件变量C时,其将mutex释放,然后其他线程就可以进入该管程中,通过改变C的值可以使得条件C满足,因此对条件变量C可以有如下操作:

    (1)wait(c, m):线程调用该操作,等待条件C满足后继续执行,在等待过程中,释放mutex,因此此过程中,该线程不占用管程。

    (2)signal(c):线程调用该操作表明此时条件C为真。

    一个线程发生signal()后,至少有两个线程想要占用包含条件变量的管程:发出signal()操作的线程P,等待条件变量的线程Q,此时有两种选择:

    1.非阻塞式条件变量:Q继续等待直到P完成。

    2.阻塞式条件变量:P继续等待直到Q完成。

    两种条件变量类型

    阻塞式条件变量

    也被称为霍尔风格(Hoare-style)管程,如下图所示:

     

    每个管程包含两个线程队列e,s,其中:

    e:入口队列

    s:发出signal的线程队列

    对于每个条件变量C,有一个线程队列,用C.q表示,如上图的a.q、b.q,这些队列很多情况下可以实现为FIFO模式。

    阻塞式条件变量实现如下:

     

    非阻塞式条件变量

    也称为Mesa风格管程,如下图所示:

     

    该模型中,发出signal()操作的线程不会失去管程的占用权,被notified()的线程将会被移到队列e中,相较于阻塞式条件变量,该模型不需要队列s。例如Pthread中的条件变量就采用这种非阻塞模式,即发出signal()操作的线程优先级高于被notified()的线程,要使用这种条件变量:首先利用pthread_mutex_lock获取互斥锁,然后调用pthread_cond_wait在线程睡眠等待之前先释放互斥锁,在其被唤醒后再重新获取互斥锁。关于pthread条件变量如下会有详细介绍。

    非阻塞条件变量实现如下:

     

    POSIX同步之互斥锁和条件变量的使用

    如下为经典的有界缓冲区问题,可以用生产者/消费者模型描述,示意图如下:

     

    采用互斥量的生产者/消费者代码如下:

      1 [root@bogon unp]# cat producer_consumer_mutex.c
      2 #include <unistd.h>
      3 #include <sys/types.h>
      4 #include <pthread.h>
      5 #include <stdlib.h>
      6 #include <string.h>
      7 #include <errno.h>
      8 #include <stdio.h>
      9 
     10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
     11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
     12 #define BUFFERSIZE 10
     13 
     14 int g_buffer[BUFFERSIZE];
     15 
     16 unsigned short in = 0;
     17 unsigned short out = 0;
     18 
     19 pthread_mutex_t g_mutex;
     20 
     21 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
     22 
     23 void* consumer(void* arg)
     24 {
     25         int num = (int)arg;
     26         /* 不断消费 */
     27         while (1)
     28         {
     29                 pthread_mutex_lock(&g_mutex);
     30 
     31                 /* 打印仓库当前状态 */
     32                 int i;
     33                 for (i = 0; i < BUFFERSIZE; i++) 
     34                 {
     35                         if (g_buffer[i] == -1)
     36                                 printf("g_buffer[%d] = %s
    ", i, "null");
     37                         else
     38                                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     39 
     40                         if (i == out)
     41                                 printf("g_buffer[%d]可以消费
    ", i);
     42                 }
     43 
     44                 /* 消费产品 */
     45                 printf("thread %d 开始消费产品 %d
    ", num, g_buffer[out]);
     46 sleep(4);       /* 消费一个产品需要4秒 */
     47                 g_buffer[out] = -1;
     48                 printf("消费完毕
    ");
     49                 out = (out + 1) % BUFFERSIZE;
     50 
     51                 pthread_mutex_unlock(&g_mutex);
     52         }
     53 
     54         return NULL;
     55 }
     56 
     57 void* producer(void* arg)
     58 {
     59         int num = (int)arg;
     60         /* 不断生产 */
     61         while (1)
     62         {
     63                 pthread_mutex_lock(&g_mutex);
     64 
     65                 /* 打印仓库当前状态 */
     66                 int i;
     67                 for (i = 0; i < BUFFERSIZE; i++)
     68         {
     69                 if (g_buffer[i] == -1)
     70                 printf("g_buffer[%d] = %s
    ", i, "null");
     71             else
     72                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     73   
     74             if (i == in)
     75                 printf("g_buffer[%d]可以生产
    ", i);
     76         }
     77 
     78                 /* 生产产品 */
     79                 g_buffer[in]++;
     80                 printf("thread %d 开始生产产品 %d
    ", num, g_buffer[in]);
     81                 sleep(2);       /* 生产一个产品需要2秒 */
     82                 printf("生产完毕
    ");
     83                 in = (in + 1) % BUFFERSIZE;
     84 
     85                 pthread_mutex_unlock(&g_mutex);
     86         }
     87 
     88         return NULL;
     89 }
     90 
     91 int main(void)
     92 {
     93         /* 初始化仓库 */
     94         int i;
     95         for (i = 0; i < BUFFERSIZE; i++)
     96                 g_buffer[i] = -1;
     97 
     98         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
     99         for (i = 0; i < CONSUMER_COUNT; i++)
    100         {
    101                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
    102         }
    103 
    104         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
    105         for (i = 0; i < PRODUCER_COUNT; i++)
    106         {
    107                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
    108         }
    109 
    110         /* 等待创建的所有线程退出 */
    111         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
    112         {
    113                 pthread_join(g_thread[i], NULL);
    114         }
    115 
    116         return 0;
    117 }
    118 
    119 // output
    120 ...
    121 thread 2 开始生产产品 4
    122 生产完毕
    123 g_buffer[0] = 4
    124 g_buffer[1] = 4
    125 g_buffer[2] = 4
    126 g_buffer[3] = 2
    127 g_buffer[3]可以生产
    128 g_buffer[4] = 2
    129 g_buffer[5] = 1
    130 g_buffer[6] = 1
    131 g_buffer[7] = 0
    132 g_buffer[8] = 0
    133 g_buffer[9] = 4
    134 thread 1 开始生产产品 3
    135 生产完毕
    136 g_buffer[0] = 4
    137 g_buffer[1] = 4
    138 g_buffer[2] = 4
    139 g_buffer[3] = 3
    140 g_buffer[4] = 2
    141 g_buffer[5] = 1
    142 g_buffer[6] = 1
    143 g_buffer[7] = 0
    144 g_buffer[8] = 0
    145 g_buffer[9] = 4
    146 g_buffer[9]可以消费
    147 thread 0 开始消费产品 4
    148 消费完毕
    149 ...
    View Code

    但是上述程序中存在一个问题,就是当生产者线程未准备好产品时,消费者线程却在不断执行循环,这种被称为轮转(spinning)或者轮询(polling)的现象是对CPU资源的一大浪费。如下引入条件变量与互斥锁共同工作,互斥锁用于加锁互斥,而条件变量则专注于等待,每个条件变量总是和一个互斥锁关联。

    采用条件变量的生产者/消费者代码如下:

      1 [root@bogon unp]# cat producer_consumer_condition.c
      2 #include <unistd.h>
      3 #include <sys/types.h>
      4 #include <pthread.h>
      5 #include <stdlib.h>
      6 #include <string.h>
      7 #include <errno.h>
      8 #include <stdio.h>
      9 
     10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
     11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
     12 #define BUFFERSIZE 10
     13 
     14 int g_buffer[BUFFERSIZE];
     15 
     16 unsigned short in = 0;
     17 unsigned short out = 0;
     18 
     19 pthread_mutex_t g_mutex;
     20 
     21 typedef struct
     22 {
     23         pthread_mutex_t mutex;
     24         pthread_cond_t cond;
     25 } Condition;
     26 
     27 Condition not_empty = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
     28 Condition not_full = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
     29 
     30 int nready;             /* 可以消费的产品数量 */
     31 
     32 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
     33 
     34 void* consumer(void* arg)
     35 {
     36         int num = (int)arg;
     37         /* 不断消费 */
     38         while (1)
     39         {
     40                 pthread_mutex_lock(&g_mutex);
     41 
     42                 /* 打印仓库当前状态,(为了便于比较,这段打印临界区依然只使用互斥锁保护) */
     43                 int i;
     44         for (i = 0; i < BUFFERSIZE; i++) 
     45         {
     46                 if (g_buffer[i] == -1)
     47                 printf("g_buffer[%d] = %s
    ", i, "null");
     48             else
     49                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     50 
     51             if (i == out)
     52                 printf("g_buffer[%d]可以消费
    ", i);
     53         }
     54 
     55                 pthread_mutex_unlock(&g_mutex);
     56 
     57                 /* 消费产品 */
     58                 pthread_mutex_lock(&not_empty.mutex);
     59 
     60                 while (nready == 0)
     61                         pthread_cond_wait(&not_empty.cond, &not_empty.mutex);
     62                 printf("thread %d 开始消费产品 %d
    ", num, g_buffer[out]);
     63                 sleep(4);       /* 消费一个产品需要4秒 */
     64                 g_buffer[out] = -1;
     65                 printf("消费完毕
    ");
     66                 --nready;
     67                 out = (out + 1) % BUFFERSIZE;
     68 
     69                 pthread_cond_signal(&not_full.cond);
     70                pthread_mutex_unlock(&not_empty.mutex);
     71         }
     72 
     73         return NULL;
     74 }
     75 
     76 void* producer(void* arg)
     77 {
     78         int num = (int)arg;
     79         /* 不断生产 */
     80         while (1)
     81         {
     82                 pthread_mutex_lock(&g_mutex);
     83 
     84                 /* 打印仓库当前状态 */
     85                 int i;
     86                 for (i = 0; i < BUFFERSIZE; i++)
     87         {
     88                 if (g_buffer[i] == -1)
     89                 printf("g_buffer[%d] = %s
    ", i, "null");
     90             else
     91                 printf("g_buffer[%d] = %d
    ", i, g_buffer[i]);
     92   
     93             if (i == in)
     94                 printf("g_buffer[%d]可以生产
    ", i);
     95         }
     96 
     97                 pthread_mutex_unlock(&g_mutex);
     98 
     99                 /* 生产产品 */
    100                 pthread_mutex_lock(&not_full.mutex);
    101 
    102                 while (nready == BUFFERSIZE)
    103                         pthread_cond_wait(&not_full.cond, &not_full.mutex);
    104                 g_buffer[in]++;
    105                 printf("thread %d 开始生产产品 %d
    ", num, g_buffer[in]);
    106                 sleep(2);       /* 生产一个产品需要2秒 */
    107                 printf("生产完毕
    ");
    108                 ++nready;
    109                 in = (in + 1) % BUFFERSIZE;
    110 
    111                 pthread_cond_signal(&not_empty.cond);
    112                 pthread_mutex_unlock(&not_full.mutex);
    113         }
    114 
    115         return NULL;
    116 }
    117 
    118 int main(void)
    119 {
    120         /* 初始化仓库 */
    121         int i;
    122         for (i = 0; i < BUFFERSIZE; i++)
    123                 g_buffer[i] = -1;
    124 
    125         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
    126         for (i = 0; i < CONSUMER_COUNT; i++)
    127         {
    128                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
    129         }
    130 
    131         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
    132         for (i = 0; i < PRODUCER_COUNT; i++)
    133         {
    134                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
    135         }
    136 
    137         /* 等待创建的所有线程退出 */
    138         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
    139         {
    140                 pthread_join(g_thread[i], NULL);
    141         }
    142 
    143         return 0;
    144 }
    145 
    146 // output is the same as above
    View Code

    条件变量使用说明:

    一个条件变量的改变是原子性的,因此需要一个互斥锁来保证,因此,条件变量的使用代码可以如下:

    1 typedef struct
    2 {
    3     pthread_mutex_t mutex;
    4     pthread_cond_t cond;
    5     // 与条件变量相关的变量声明
    6 } Condition;
    7 Condition cond_a = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
    8 Condition cond_b = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
    9 ...

    1.执行signal操作的线程中流程如下:

    pthread_mutex_lock(&cond_a.mutex);
    // 设置条件为真
    pthread_cond_signal(&cond_a.cond);
    pthread_mutex_unlock(&cond_a.mutex);

    说明:

    pthread_cond_signal与pthread_mutex_unlock的顺序:如果先signal后unlock,则可以确定signal操作是由lock住cond_a.mutex的线程调用的;如果先unlock后signal,则任一线程都可调用signal操作。如果需要可预见的调度行为,最好先signal后unlock,就像上面那样。

    2.执行wait操作的线程中流程如下:

    pthread_mutex_lock(&cond_a.mutex);
    while (条件为假)
        pthread_cond_wait(&cond_a.cond, &cond_a.mutex);
    // 修改条件
    pthread_mutex_unlock(&cond_a.mutex);

    说明:

    (1)pthread_cond_wait执行如下3个操作:

    • 解锁cond_a.mutex,使得其他线程可以进入以便改变条件
    • 将调用线程阻塞在条件变量cond_a上(睡眠了),直到某个线程将条件设为真
    • 成功返回后(此时某个线程调用了pthread_cond_signal/broadcast)重新对cond_a.mutex加锁。

    (2)是否可以将:

    while (条件为假)
        pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

    替换为:

    if (条件为假)
        pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

    答案是如果将while替换为if,可以发生虚假(spurious)唤醒:即发出signal的线程并为将条件设为真就调用了pthread_cond_signal,此时pthread_cond_wait却成功返回了,如此将导致后续的代码执行失败。因此必须在pthread_cond_wait返回后再次判断条件是否确实为真,即必须使用循环而非条件判断。

     

     

     

  • 相关阅读:
    Mongdb 简单增删改查
    mongdb的安装
    VO,DO,DTO,PO,POJO,EJB
    JavaScript是如何工作的:事件循环和异步编程的崛起+ 5种使用 async/await 更好地编码方式!
    判断DataTale中判断某个字段中包含某个数据
    查询表中某个字段的值出现次数大于1的数据
    判断对象数组中是否含有某个对象。
    C# datatable 重新排序
    jquery 验证大于0的整数
    jQuery 心跳请求
  • 原文地址:https://www.cnblogs.com/benxintuzi/p/4874516.html
Copyright © 2011-2022 走看看