zoukankan      html  css  js  c++  java
  • (转载)Linux 多线程条件变量同步

    条件变量是线程同步的另一种方式,实际上,条件变量是信号量的底层实现,这也就意味着,使用条件变量可以拥有更大的自由度,同时也就需要更加小心的进行同步操作。条件变量使用的条件本身是需要使用互斥量进行保护的,线程在改变条件状态之前必须首先锁住互斥量,其他线程在获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定之后才能计算条件。

    模型

    #include<pthread.h>
    pthread_cond_t cond              //准备条件变量
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;     //初始化静态的条件变量
    pthread_cond_init()         //初始化一个动态的条件变量
    pthread_cond_wait()         //等待条件变量变为真
    pthread_cond_timedwait()    //等待条件变量变为真,等待有时间限制。
    pthread_cond_signal()       //至少唤醒一个等待该条件的线程
    pthread_cond_broadcast()    //唤醒等待该条件的所有线程
    pthread_cond_destroy()      //销毁一个条件变量

    pthread_cond_init()

    //初始化一个动态的条件变量
    //成功返回0,失败返回error number
    int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

    cond:条件变量指针,这里使用了restrict关键字
    attr:条件变量属性指针,默认属性赋NULL

    pthread_cond_wait() / pthread_cond_timedwait()

    //等待条件变量为真。收到pthread_cond_broadcast()或pthread_cond_signal()就唤醒
    //成功返回0,失败返回error number
    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

    条件变量的使在多线程的程序中,因为各个线程都可以访问大部分的进程资源,所以我们为了保证公有资源的使用是可以控制的,在一个线程开始使用公有资源之前要尝试获取互斥锁,在使用完毕之后要释放互斥锁,这样就能保证每个公共资源在一个时刻都只能被一个线程使用,在一定程度上得到了控制,但这并不能解决同步的问题,考虑如下两个线程:

    Q=create_queue();
    pthread_t mutex
    //线程A,入队
    while(1){
        lock(mutex);
        in_queue(Q);
        unlock(mutex);
    }
    
    //线程B,出队
    while(1){
        lock(mutex);
        out_queue(Q);
        unlock(mutex);
    }

    上述代码可以实现两个线程的互斥,即同一时刻只有一个线程在使用公有资源-队列。但如果线程B获取了锁,但队列中是空的,它的out_queue()也就是没有意义的,所以我们这里更需要一种方法将两个线程进行同步:只有当队列中有数据的时候才进行出队。
    我们设计这样一种逻辑:

    //线程B,出队
    while(1){
        lock(mutex);
        if(队列是空,线程不应该执行){
            释放锁;
            continue;
        }
        out_queue(Q);
        unlock(mutex);
    }

    这个程序就解决了上述的问题,即便线程B抢到了互斥锁,但是如果队列是空的,他就释放锁让两个线程重新抢锁,希望这次线程A能抢到并往里放一个数据。
    但这个逻辑还有一个问题,就是多线程并发的问题,很有可能发生的一种情况是:线程B抢到了锁,发现没有数据,释放锁->线程A立即抢到了锁并往里放了一个数据->线程B执行continue,显然,这种情况下是不应该continue的,因为线程B想要的条件在释放锁之后立即就被满足了,它错过了条件。
    So,我们想一种反过来的逻辑:

    //线程B,出队
    while(1){
        lock(mutex);
        if(队列是空,线程不应该执行){
            continue;
            释放锁;
        }
        out_queue(Q);
        unlock(mutex);
    }

    显然这种方法有个致命的问题:一旦continue了,线程B自己获得锁就没有被释放,这样线程A不可能抢到锁,而B继续加锁就会形成死锁!
    Finaly,我们希望看到一个函数fcn,如果条件不满足,能同时释放锁+停止执行线程,如果条件满足,自己当时获得的锁还在

    //线程B,出队
    while(1){
        lock(mutex);
        fcn(当前线程不应该执行,mutex)     //if(当前线程不应该执行){释放锁“同时” continue;}
        out_queue(Q);
        unlock(mutex);
    }

    OK,这个就是pthread_cond_wait()的原理了,只不过它把continue变成了"休眠"这种由OS负责的操作,可以大大的节约资源。
    当然,线程执行的条件是很难当作参数传入一个函数的,POSIX多线程的模型使用系统提供的"条件变量"+"我们自己定义的具体条件" 来确定一个线程是否应该执行接下来的内容。"条件变量"只有,所以一种典型的多线程同步的结构如下

    //线程B,出队
    while(1){
        lock(mutex);
        while(条件不满足)
            pthread_cond_wait(cond,mutex)
            //获得互斥锁可以同时保护while里的条件和cond的判断,二者总是用一把锁保护,并一同释放  
            //cond为假,就休眠同时释放锁,等待被cond为真唤醒,把自己获得的锁拿回来           
            //拿回自己的锁再检查线程执行条件,条件不满足继续循环,直到条件满足跳出循环
            //这个函数是带着"线程的执行条件为真"+"cond为真"走出循环的
            //这个函数返回后cond被重新设置为0
        out_queue(Q);
        unlock(mutex);
    }

    pthread_cond_braoadcast()/pthread_cond_signal()

    //使条件变量为真并唤醒wait中的线程,前者唤醒所有wait的,后者唤醒一个
    //成功返回0,失败返回error number
    int pthread_cond_broadcast(pthread_cond_t *cond);
    int pthread_cond_signal(pthread_cond_t *cond);

    pthread_cond_destroy()

    //销毁条件变量
    //成功返回0,失败返回error number
    int pthread_cond_destroy(pthread_cond_t *cond);

    例子-线程池

    \\\\\\\\\\\\\\\\\\
    //thread.h
    #ifndef __THREAD_H__
    #define __THREAD_H__
    #define THREAD_NUM 3
    #define TASK_NUM 100
    
    
    typedef struct{//每个节点的封装格式,fcn是用户自定义函数,arg是用户自定义函数参数指针,保证通用性声明为void
        void* (*fcn)(void* arg);
        void* arg;
    }task_t;
    typedef struct{        //用户自定义函数的参数结构体
        int x;
    }argfcn_t;
    
    //#define LQ_DATA_T task_t*
    #define LQ_DATA_T task_t
    
    #endif  //__THREAD_H__
    \\\\\\\\\\\\\\\\\
    //lqueue.c
    #include"thread.h"
    #include"lqueue.h"
    #include<stdlib.h>
    ...
    \\\\\\\\\\\\\\\\\
    //thread_pool.c
    #include<stdio.h>   
    #include<stdlib.h>
    #include<string.h>
    #include<pthread.h>
    #include"thread.h"
    #include"lqueue.h"
    
    //互斥量和条件变量
    pthread_mutex_t lock;
    pthread_cond_t cond;
    
    
    //全局的表
    lqueue_t* Q;
    
    //每个线程的任务,必须是这个形式的函数指针
    void* do_task(void* p){   
        task_t data;
        int ret=0;
        while(1){
            pthread_mutex_lock(&lock);
            while(is_empty_lqueue(Q)){          //大家收到广播,因为延迟,可能醒了好几个,要判断一下是不是自己
                pthread_cond_wait(&cond,&lock);     //先抢到锁再醒
            }
            ret=out_lqueue(Q,&data);
            pthread_mutex_unlock(&lock);
            data.fcn(data.arg);
        }
    }
    
    //创建线程池
    void create_pool(void){
        //初始化队列
        Q=create_lqueue();
        //初始化互斥量
        pthread_mutex_init(&lock,NULL);
        //初始化条件变量
        pthread_cond_init(&cond,NULL);
        int i=THREAD_NUM;
        pthread_t tid[THREAD_NUM];
        while(i--)
            pthread_create(&tid[i],NULL,do_task,NULL);
    }
    
    
    
    //准备函数
    void* fcn(void* parg){                //用户自定义的需要线程执行的函数
        argfcn_t* i=(argfcn_t*)parg;
        printf("this is task1
    ");
        printf("task1:%d
    ",i->x);
    }
    
    //添加任务
    void pool_add_task(void*(*pfcn)(void*parg),void*arg){
        task_t task;
        task.fcn=pfcn;
        task.arg=arg;
    
        in_lqueue(Q,task);
        pthread_cond_signal(&cond); //添加了一个任务,用signal更好
    }
    
    int main(int argc, const char *argv[])
    {
        //创建线程池
        create_pool();
    
        //准备参数
        argfcn_t argfcn;
        argfcn.x=5;
    
        //添加任务
        pool_add_task(fcn,(void*)&argfcn);  
        pool_add_task(fcn,(void*)&argfcn);  
        pool_add_task(fcn,(void*)&argfcn);  
        pause();    
        return 0;
    }
  • 相关阅读:
    ossec配置使用腾讯企业邮箱告警
    网络排除工具之tcping
    pyenv 安装
    CVE-2020-1472 漏洞检测
    容器技术的核心
    简述 进程、线程、协程的区别 以及应用场景--记录
    php函数使用
    php使用表单post方法进行页面
    CURL方式使用代理访问网站
    nginx下隐藏admin和当前域名下得index.php
  • 原文地址:https://www.cnblogs.com/24zyt/p/6897168.html
Copyright © 2011-2022 走看看