zoukankan      html  css  js  c++  java
  • Linux _条件变量

    条件变量
    1. 问题
    某些情况下,某些线程有这个需求:
    仅当满足某个特定条件时,才执行对应操作;
    如果该条件不满足,就阻塞该线程,一直等到对应的条件满足才继续执行。

     解决方案:
     当条件满足时,使用信号量唤醒对应线程,
     当条件不满足时,使用信号量阻塞对应线程。
     并用互斥量(互斥锁)来保护对该条件的访问。
    
     Linux提供了一种更方便的机制来解决该类问题:条件变量
    
    1. 使用方法
      条件变量是一种特殊的“通知”,而不是指某个条件。
      当特定的条件满足时,就使用pthread_cond_signal发送该通知(即发送该条件)
      即通知等待该条件的线程,它所等待的条件已经满足了。

      当特定的条件还不满足时,就使用pthread_cond_wait来等待该通知(即等待该条件)

      条件变量和互斥量结合使用:
      (1) 等待”通知”的pthread_cond_wait和发送“通知”的pthread_cond_signal,
      这两个调用的内部实现,需要使用互斥量,用来保护该条件变量。

      (2) 用来判断条件是否满足的相关共享资源,也需要用该互斥量进行保护。
      
    2. 条件变量的使用接口
      1) 条件变量的表示
      类型:pthread_cond_t

      2) 条件标量的初始化
      编译时初始化:
      pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
      运行时初始化:
      pthread_cond_t my_cond;
      pthread_cond_init(&my_cond, NULL);
      /* 参数2为NULL, 表示该条件变量使用默认属性 */

      3) 等待条件标量
      pthread_cond_wait
      原型:int pthread_cond_wait (pthread_cond_t *cond,
      pthread_mutex_t *mutex);
      参数:cond, 条件变量
      mutex, 该条件变量所使用的互斥量

       pthread_cond_timedwait
       原型: int  pthread_cond_timedwait(pthread_cond_t *cond,
                                                           pthread_mutex_t *mutex,
                                                           const struct timespec *abstime);
       功能:pthread_cond_wait的限时等待版本
       参数:abstime, 是一个绝对时间,即当前时间+超时时间
      
       注意:当因等待该条件变量而使该线程阻塞时,隐含了一个动作(对该互斥量进行解锁)
                当被唤醒时,即从该调用返回时,又隐含了一个动作(对该互斥量进行加锁)
      

      4) “发送”该条件变量
      即,通知(唤醒)等待该条件变量的线程。
      如果没有线程在等待该条件变量,则忽视该操作,无累积效应。(而多次执行V操作,将有“累积效应”)

       pthread_cond_signal
       原型: int  pthread_cond_signal(pthread_cond_t  *cond);
       功能: 如果有多个线程都在等待该条件变量,
                则,使用调度策略唤醒一个线程,其余线程继续等待。
      
       pthread_cond_broadcast
       原型:int  pthread_cond_broadcast(pthread_cond_t *cond);
       功能:唤醒等待该条件变量的所有线程。
      
    3. 实例
      生产者线程、消费者线程
      最多可以同时存放BUFF_SIZE个“产品”
      每个产品用一个整数表示。
      即使用int buff[BUFF_SIZE]存放所有产品。
      当buff放满时,不可以再生产。
      当buff为空时,不可以再消费

      分别调整生产者和消费者的速度,观察输出信息。
      main6.c

    4.
    创建两个线程5
    线程1接收用户输入
    接收完成后,由线程2对该字符串进行“加工”,即统计其长度,并打印输出。

    同步要求:
    接收到用户输入后,才能统计字符串长度。
    用户统计完成后,才能继续接收用户输入。
    
    main7.c
    
    
         ------------------------------
         lock
         if (判断是否需要等待)
            pthread_cond_wait(&cond, &lock)
    
         work     
         unlock
        -------------------------------
         pthread_cond_signal(&cond);
        _______________________________
       thread1:
            如果COUNT > 0 就执行work()
            否则,等待直到该条件满足
    
    
       pthread_mutex_lock(&lock);
       if (!(COUNT > 0)) {
            pthread_cond_wait(&conditon, &lock);
       } 
       pthread_mutex_unlock(&lock);
       work();
    
    
       线程2//
       COUNT++;
    
       pthread_mutex_lock(&lock);
       if(COUNT > 0) {
            pthread_cond_signal(&condition)
       } 

    main6.c

    #include <pthread.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <signal.h>
    
    #define BUFF_SIZE  3
    
    int buff[BUFF_SIZE];
    int pos_product;
    int pos_consume;
    
    // ∂®“ÂÃıº˛±‰¡øcond_product
    pthread_cond_t cond_product;
    
    // ∂®“ÂÃıº˛±‰¡øcond_consume
    pthread_cond_t cond_consume;
    
    // ∂®“Âœfl≥ê•≥‚À¯lock
    pthread_mutex_t lock;
    
    void init_work(void)
    {
        // Ãıº˛±‰¡øµƒ≥ı ºªØ
        pthread_cond_init(&cond_product, NULL);
        pthread_cond_init(&cond_consume, NULL);
    
        // œfl≥ÃÀ¯µƒ≥ı ºªØ
        pthread_mutex_init(&lock, NULL);
    
        pos_product = 0;
        pos_consume = 0;    
    }
    
    void* handle_product(void *arg)
    {
        int i;
    
        for(i=1; i<5; i++) {
            pthread_mutex_lock(&lock);
            if ((pos_product + 1) % BUFF_SIZE
                  == pos_consume) {
                // ≤÷ø‚“—¬˙£¨”¶∏√◊Ë»˚ Ωµ»¥˝
                printf("Buff is full, wait...
    ");
                pthread_cond_wait(&cond_product, &lock);       //µ±Ãıº˛±‰¡øŒ¥∑¢…˙∏ƒ±‰ ± £¨ Ω¯––µΩ’‚“ª≤Ωª·◊‘∂ØΩ‚À¯  £¨µ±Ãıº˛±‰¡ø¬˙◊„ «“ 
                                                        // À¯◊¥Ã¨Œ™Œ¥º”À¯◊¥Ã¨£¨◊‘∂غœÀ¯÷¥––œ¬“ª≤Ω
                                                      // »Ù À¯◊¥Ã¨Œ™ºœÀ¯◊¥Ã¨  £¨µ»¥˝∆‰À¯◊¥Ã¨Œ™Œ¥º”À¯  º”À¯∫Û÷¥–– œ¬“ª≤Ω 
            }
            buff[pos_product] = i;
            printf("Product a productor(%d)
    ", i);
    
            pos_product++;
            if (pos_product >= BUFF_SIZE) {
                pos_product = 0;
            }
    
            //pos_product = (pos_product+1)%BUFF_SIZE;
    
            pthread_cond_signal(&cond_consume);
            pthread_mutex_unlock(&lock);
    
            printf("product sleep begin.
    ");
            sleep(1);
            printf("product sleep end.
    ");
        }
    }
    
    void* handle_consume(void *arg)
    {
        int val;
        int i;
        for (i=1; i<5; i++) {
            pthread_mutex_lock(&lock);
            if (pos_consume == pos_product) {
                /* ◊Ë»˚ Ωµ»¥˝ */
                printf("Buff is empty, waiting...
    ");
                pthread_cond_wait(&cond_consume, &lock);
            }   
    
            /* ¥”≤÷ø‚»°≥ˆ≤˙∆∑ */
            val = buff[pos_consume];
            printf("Consume a product. val = %d
    ", val);
    
            /* –fi∏ƒø…œ˚∑—≤˙∆∑µƒŒª÷√ */
            pos_consume++;
            if (pos_consume >= BUFF_SIZE) {
                pos_consume = 0;
            }
    
            pthread_cond_signal(&cond_product);
            pthread_mutex_unlock(&lock);
    
            printf("consumer sleep...begin
    ");
            sleep(3);
            printf("consumer sleep...end
    ");
        }
    }
    
    int main(void)
    {
        pthread_t th_product, th_consume;
        int ret;
    
        init_work();
    
        ret = pthread_create(&th_product, 0, handle_product, 0);
        if (ret != 0) {
            perror("create thread failed!
    ");
            exit(1);
        }
    
        ret = pthread_create(&th_consume, 0, handle_consume, 0);
        if (ret != 0) {
            perror("create thread failed!
    ");
            exit(1);
        }
    
    
    
        pthread_join(th_product, 0);
        pthread_join(th_consume, 0);
    
        return 0;
    }
    

    main7.c

    #include <pthread.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <strings.h>
    #include <string.h>
    
    
    #define BUFF_SIZE  80
    char buff[BUFF_SIZE];
    pthread_cond_t  cond_input;
    pthread_cond_t  cond_work;
    pthread_mutex_t lock;
    
    void *hanle_input(void *arg)
    {
        int ret;
        int fd;
    
        fd = 0;
        while(1) {
            fd_set read_set;
            FD_ZERO(&read_set);
            FD_SET(fd, &read_set);
    
            ret = select(fd+1, &read_set, 0, 0, 0);
            if (ret == -1) {
                perror("select failed!
    ");
                exit(1);
            } else {
                if (FD_ISSET(fd, &read_set)) {
                    pthread_mutex_lock(&lock);
                    if (buff[0] != '') {
                        pthread_cond_wait(&cond_input, &lock);
                    }
                    bzero(buff, sizeof(buff));
                    ret = read(fd, buff, sizeof(buff));
                    if (ret == -1) {
                        printf("read failed!
    ");
                        exit(1);
                    }
    
                    pthread_cond_signal(&cond_work);
                    pthread_mutex_unlock(&lock);
                }
            }
        }
    }
    
    void *hanle_work(void *arg)
    {
        while(1) {
            pthread_mutex_lock(&lock);
            if (buff[0] == '') {
                pthread_cond_wait(&cond_work, &lock);
            }
    
            printf("You input %d characters
    ", strlen(buff));
            buff[0] = '';
    
            pthread_cond_signal(&cond_input);
            pthread_mutex_unlock(&lock);
        }
    }
    
    static void init_work(void)
    {
        pthread_cond_init(&cond_input, NULL);
        pthread_cond_init(&cond_work, NULL);
        pthread_mutex_init(&lock, NULL);
    }
    
    int main(void)
    {
        int ret;
        pthread_t th_input;
        pthread_t th_work;
    
        init_work();
    
        ret = pthread_create(&th_input, 0, hanle_input, 0);
        if (ret != 0) {
            printf("create thread failed!
    ");
        }
    
        ret = pthread_create(&th_work, 0, hanle_work, 0);
        if (ret != 0) {
            printf("create thread failed!
    ");
        }
    
        pthread_join(th_input, 0);
        pthread_join(th_work, 0);
    
        return 0;
    }
  • 相关阅读:
    镜像---移除
    镜像--保存于载入
    镜像、docker、容器三者关系
    容器管理
    HBase数据读写流程(1.3.1)
    HBase表的memstore与集群memstore
    HBase预分区方法
    HBase中的TTL与MinVersion的关系
    关于HBase的memstoreFlushSize。
    hbase java api样例(版本1.3.1,新API)
  • 原文地址:https://www.cnblogs.com/Sico2Sico/p/5384200.html
Copyright © 2011-2022 走看看