zoukankan      html  css  js  c++  java
  • Posix信号量

      提供不同进程间或给定的一个特定进程的不同线程间的同步手段。

    1. Posix有名信号量,由IPC名字标识,通常指代文件系统中的某个文件,但不要求真正存放在文件系统的某个文件中(如果信号量的实现用到了映射文件);随内核持续,可用于进程或线程间的同步
    2. Posix基于内存的信号量,放在共享内存中,同步多线程时,可以放到该多线程所属进程空间里;如果是同步多进程,那就需要把信号灯放入到共享内存中(方便多个进程访问)。随进程持续

      有名信号灯和基于内存的信号灯,具体区别体现在创建和销毁两个函数。有名信号灯使用sem_open和sem_close函数。基于内存的信号灯使用sem_init和sem_destroy函数。sem_init的参数可以控制是同步多线程,还是多进程;且该函数只能调用1次,因为调用后信号灯就存在了( 内存指针存在)。一般,使用基于内存的信号灯同步同进程多线程,使用有名信号灯同步多进程

    1. 创建(create)一个信号量。要求调用者指定初始值。通常初始值为0或1的称作二值信号量,初始值为N(N>=0)的称作计数信号量,N指示可用的资源数(比如说缓冲区个数)。
    2. 等待(wait)一个信号量。该操作测试信号量的值。如果它的值大于0,则将值减1;如果它的值小于或等于0,则阻塞等待其值变为大于0,然后将值减1。
    3. 挂起(post)一个信号量。该操作将信号量的值加1。

    生产者与消费者模型:

      只有一个生产者和消费者线程,而且它们只共享一个缓冲区。它们之间的行为被彼此约束: 生产者只有在缓冲区中的数据被消费者处理后(缓冲区为空)才能放入新的数据。消费者只有在缓冲区已被生产者放入数据后(缓冲区不为空)才能处理数据。对生产者而言,缓冲区为空表示有资源可用。对消费者而言,缓冲区不为空表示有资源可用。

      需要使用两个信号量get和put,因为初始缓冲区为空,所以我们把put的初始值置为1表示生产者的可用资源数为1,把get的初始值置为0表示消费者的初始可用资源数为0,消费者先运行

    互斥锁、条件变量区信号量别

    1. 互斥锁必须总是由锁住他的线程解锁,信号量的挂出不必由执行过它的等待操作的同一线程执行
    2. 互斥锁要么被锁住要没被解开
    3. 既然信号量有一个与之关联的状态(它的计数值)那么信号量的挂出总是被记住,然而向一个条件变量上发送信号时,如果没有线层等待该条件变量那么信号将丢失

      互斥锁和信号量是为线程间的同步机制说明。信号量为进程间的同步机制说明,这些进程可能共享也可能不共享某个内存区域。但信号量也可用同步线程,互斥锁条件变量也可用于同步进程。

    有名信量

      一个信号量的状态可能包含在它本身的sem_t数据类型中,还包含其他的信息(如:文件描述符),知道该标识符的任何进程都可以访问该信号量,它的整数标识符只是告诉内核具体引用哪个信号量

      即使对于某个特定的名字sem_open调用在每个进程中可能返回不同的指针,使用该指针的信号量函数(sem_post,sem_wait)引用的任然是同一个信号量

    sem_open

      创建并初始化信号灯,返回值:成功时,返回信号灯的指针,错误返回SEM_FAILED

    #include <semaphore.h>
    sem_t * sem_open(const char * name,int oflag,mode_t mode,unsigned int value);
    sem_t * sem_open(const char * name,int oflag);
    1. name是给信号灯指定一个名字
    2. oflag的值为O_CREAT,表示如果信号灯不存在,创建信号灯,若存在并不返回一个错误;为O_CREAT|O_EXCL,如果信号灯不存在报错。
    3. 后面两个参数,只有新建信号灯时使用。mode为信号灯的权限(0644),value为信号灯的值,该值不能超过SEM_VALUE_MAX(至少为32767),二值信号量的初值为1,计数信号量的初值往往大于一。

      默认都具备读写权限不需要指定O_RNONLY 和O_WRONLY,因为信号量的挂出与等待操作都要改变信号量的值,不具备读写权限的信号量会导致sem_open返回EACCES(访问权限不符合)

    sem_close

      进程终止时(自愿exit,_exit或非自愿接收到一个信号)内核对其上仍打开的所有有名信号量自动执行信号量关闭操作。

    #include <semaphore.h>
    int sem_close(sem_t * sem)

      成功时返回0;失败,-1

      每个信号灯有一个引用计数器记录当前打开次数.关闭一个信号灯并没有将它从系统中删除因为他是随内核持续的,而是信号灯引用计数减1,即时当前没有进程打开信号量,它的值仍然保持。

    sem_unlink

      信号灯引用计数为大于0时,name从系统中删除,而其信号量的析构要等到最后一个sem_close发生。

      返回值:成功时,返回0,失败,-1,需要显示调用

    #include <semaphore.h>
    int sem_close(const char *name)

    sem_wait/sem_trywait

      sem_wait测试所指信号量的值,若大于0将它减一立刻返回,如果等于0,调用的线程会被投入睡眠,直到该值大于0这时在将它减一,随后返回。(测试减一是原子操作)

      sem_trywait与sem_wait的区别是:当所指信号量的值为0时,不将调用线程投入睡眠。返回EAGAIN错误。

      如果某个信号量被中断。sem_wait可能尽早的返回,返回的错误为EINTR。

    #include <semaphore.h>
    int sem_wait(sem_t *sem)
    int sem_trywait(sem_t *sem)

    sem_getvalue

      获得信号灯当前值。参数:sem为信号灯指针,valp为信号灯的值

      返回值:返回0或某个负数。如果当前信号量以上锁,其绝对值等于等待该信号量解锁的线程数。

    #include <semaphore.h>
    int sem_getvalue(sem_t *sem,int *valp)

    sem_post

      返回值:成功时,返回0,失败,-1

      任何线程都可以挂出一个信号(比如值由0增加为1),即使当前没有线程在该信号上阻塞;然而如果某个线程调用pthread_cond_signal,当时没有任何线程阻塞在pthread_cond_wait上时,那么发送相应的条件变量的信号将丢失。

      在各种同步技巧(互斥锁、条件变量、读写锁、信号量)中,能够在信号处理程序中安全调用的唯一函数时sem_post。

    #include <semaphore.h>
    int sem_post(sem_t *sem)

    总结:

    1. 当某个进程持有信号量的锁时进程没有释放他就终止,内核并不给信号量解锁,这与锁不同,当进程持有某个记录的锁没有释放就终止,内核自动释放。
    2. 能够从信号处理程序中安全调用的唯一函数时sem_post,互斥锁是为上锁而优化,条件变量为等待而优化,信号量即可用于上锁,也可用于等待,01信号量就相当于互斥锁
    3. 如果一个以上的线程在信号灯上等待,sem_post将唤醒其中一个(优先级最高的或最早等待的线程),如果没有线程在等待,信号灯计数器被加一。
    4. 假如有个代码段,期望最多有两个线程同时在里面执行(其他线程必须等待),可以使用一个信号灯而不需要附加任何状态,初始化信号灯值为2,然后再代码段开始调用sem_wait,结尾处调用sem_post,然后两个线程可以再信号登上等待而不被阻塞。 
    5. sem_open,在父进程中打开的信号量在子进程中仍然打开

      生产者与消费者问题:

    #include "unpipc.h"
    
    #define BUFF 10
    #define SEM_MUTEX "/tmpmutex"
    #define SEM_NEMPTY "/tmpnempty"
    #define SEM_NSTORED "/tmpnstored"
    
    struct shared
    {
        int buffer[BUFF];
        sem_t *mutex,*nempty,*nstored;
    }shared;
    
    void *produce(void *arg)
    {
        int i;
        for(i=0;i<BUFF;++i)
        {
            sem_wait(shared.nempty);
            sem_wait(shared.mutex);
    
            shared.buffer[i%BUFF]=i;
    
            sem_post(shared.mutex);
            sem_post(shared.nstored);
        }
        return NULL;
    }
    
    void *consume(void *arg)
    {
        int i;
        for(i=0;i<BUFF;++i)
        {
            sem_wait(shared.nstored);//这两个wait交换会造成死锁
            sem_wait(shared.mutex);
    
            printf("shared.buffer[%d]= %d
    ",i,shared.buffer[i%BUFF]);
    
            sem_post(shared.mutex);
            sem_post(shared.nempty);
        }
        return NULL;
    }
    
    int main()
    {
        pthread_setconcurrency(2);
        pthread_t pid_consume,pid_produce;
    
        /*每个信号量都需要正确的初始化,如果先前建立的信号里因本程序终止没有
         * 删除,那么,可以在sem_open之前调用sem_unlink,并忽略任何错误,也
         * 可以指定O_EXCL,检查是否返回EEXIST错误,若是,调用sem_unlink,并
         * 且再次调用sem_open。如果想验证本程序只有一个副本在运行,可以调用
         * fcntl文件锁
         */
        shared.mutex=sem_open(SEM_MUTEX,O_CREAT|O_EXCL,FILE_MODE,1);
        shared.nempty=sem_open(SEM_NEMPTY,O_CREAT|O_EXCL,FILE_MODE,BUFF);
        shared.nstored=sem_open(SEM_NSTORED,O_CREAT|O_EXCL,FILE_MODE,BUFF);
    
        pthread_create(&pid_produce,NULL,produce,NULL);
        pthread_create(&pid_consume,NULL,consume,NULL);
    
        pthread_join(pid_produce,NULL);
        pthread_join(pid_consume,NULL);
    
        sem_unlink(SEM_MUTEX);
        sem_unlink(SEM_NEMPTY);
        sem_unlink(SEM_NSTORED);
    
        exit(0);
    }

    基于内存的信号量

    #include <semaphore.h>
    
    int sem_init(sem_t * sem,int shared,unsigned int value);
    int sem_destroy(sem_t * sem)

      sem为指向应用程序必须分配的sem_t变量,shared是指同步多线程还是多进程(0:同一进程间的线程共享;其他:进程间共享,该信号量必须放在在某个类型的共享内存中,而即将使用他的所有进程都要能访问共享内存),value为信号量值

      返回值:成功时,不返回0,失败时,返回-1

    注意:

    1. 必须保证该函数只调用一次,对一个已经初始化的信号量调用sem_init结果是未定义的。
    2. sem_open与sem_init:前者返回一个指向某个sem_t变量的指针,该变量由sem_open函数本身分配并初始化;后者的第一个参数时指向某个sem_t变量的指针,该变量由调用者分配,然后由sem_int初始化
    3. 一个基于内存的信号量,只有sem_init的参数指向的位置可用于访问该信号量,使用他的sem_t数据类型副本访问时为定义的结果
    4. 它的持续性取决于信号量存放的内存区域类型,只要某个基于内存的信号量的内存区保持有效,该信号量就一直存在,随进程持续,进程终止其消失
    5. fork的子进程通常不共享父进程内存空间,所以不能用sem_init(因为只有sem_init指向的位置才可以访问该信号量)

    多个生产者单个消费者

    #include "unpipc.h"
    #include <pthread.h>
    #include <string.h>
    
    #define BUFF 10
    
    struct shared
    {
        struct buff
        {
            char data[BUFFSIZE];
            size_t n;
        }buffer[BUFF];
        sem_t mutex,nempty,nstored;
    }shared;
    
    int fd;
    
    void *produce(void *arg)
    {
        int i=0;
        while(1)
        {
            //printf("fd is:%d
    ",fd);
            sem_wait(&shared.nempty);
    
            sem_wait(&shared.mutex);
            //如果数据缓冲区要是在一个连表上维护,此处是链表中移除某个缓冲区的
            //地方,把该操作放在临界区是避免生产者消费者对链表操作发生冲突
            sem_post(&shared.mutex);
    
            shared.buffer[i].n=read(fd,shared.buffer[i].data,BUFFSIZE);
            if(shared.buffer[i].n==0)//读取结束
            {
                sem_post(&shared.nstored);
                return NULL;
            }
    
            if(++i>=BUFF)
                i=0;//循环缓冲
    
            sem_post(&shared.nstored);
        }
        return NULL;
    }
    
    void *consume(void *arg)
    {
        int i=0;
        while(1)
        {
            sem_wait(&shared.nstored);
    
            sem_wait(&shared.mutex);
            //同上
            sem_post(&shared.mutex);
    
            if(shared.buffer[i].n==0)
                return NULL;
    
            write(1,shared.buffer[i].data,shared.buffer[i].n);
            if(++i>BUFF)
                i=0;
    
            sem_post(&shared.nempty);
        }
        return NULL;
    }
    
    int main()
    {
        printf("请输入文件名
    ");
        char pathname[100];
        fgets(pathname,100,stdin);
        int len=strlen(pathname);
        if(pathname[len-1]=='
    ')
            pathname[len-1]='';
        fd=open(pathname,O_RDONLY);
        //printf("pathname:%s",pathname);
        //printf("fd is main: %d
    ",fd);
    
        sem_init(&shared.mutex,0,1);
        sem_init(&shared.nempty,0,BUFF);
        sem_init(&shared.nstored,0,0);
    
        pthread_setconcurrency(2);
    
        pthread_t pid_produce,pid_consume;
    
        pthread_create(&pid_consume,NULL,consume,NULL);
        pthread_create(&pid_produce,NULL,produce,NULL);
        //pthread_create(&pid_consume,NULL,consume,NULL);
    
        pthread_join(pid_produce,NULL);
        pthread_join(pid_consume,NULL);
    
        sem_destroy(&shared.mutex);
        sem_destroy(&shared.nempty);
        sem_destroy(&shared.nstored);
    
        exit(0);
    }

    代码github:https://github.com/tianzengBlog/test/tree/master/ipc2/sem

    多个生产者多个消费者

      在有多个消费者的情形下,消费者验证数据使用的数组下标和数据值都变成共享的,需要使用互斥锁保护。为了方便,我们还是只使用一个互斥锁mutex。为消费者我们新增了3个变量nget、ngetval和nconsumers。

      要注意的还是生产者线程终止时的行为,我们新增了一个挂起nstored信号量的操作。因为所有的数据都验证完时nstored值为0,所有的消费者线程都在阻塞等待nstored信号量,如果不挂起nstored信号量,那么所有的消费者线程都会永远阻塞。

    #include "unpipc.h"
     
    #define NBUFF             10
    #define MAXNTHREADS     100
     
    int nitems, nproducers, nconsumers; 
     
    struct {
        int buff[NBUFF];
        int nput;
        int nputval;
        int nget;
        int ngetval;
        sem_t mutex, nempty, nstored;
    } shared;
     
    void *produce(void *);
    void *consume(void *);
     
    int main(int argc, char **argv)
    {
        int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
        pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
     
        if (argc != 4)
            err_quit("usgae: prodcons4 <#items> <#producers> <#consumers>");
     
        nitems = atoi(argv[1]);
        nproducers = min(atoi(argv[2]), MAXNTHREADS);
        nconsumers = min(atoi(argv[3]), MAXNTHREADS);
     
        Sem_init(&shared.mutex, 0, 1);
        Sem_init(&shared.nempty, 0, NBUFF);
        Sem_init(&shared.nstored, 0, 0);
     
        Pthread_setconcurrency(nproducers + nconsumers);
        for (i = 0; i < nproducers; i++) {
            prodcount[i] = 0;
            Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
        }
        for (i = 0; i < nconsumers; i++) {
            conscount[i] = 0;
            Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
        }
     
        for (i = 0; i < nproducers; i++) {
            Pthread_join(tid_produce[i], NULL);
            printf("producer count[%d] = %d
    ", i, prodcount[i]);
        }
        for (i = 0; i < nconsumers; i++) {
            Pthread_join(tid_consume[i], NULL);
            printf("consumer count[%d] = %d
    ", i, conscount[i]);
        }
        
        Sem_destroy(&shared.mutex);
        Sem_destroy(&shared.nempty);
        Sem_destroy(&shared.nstored);
     
        exit(0);
    }
     
    void* produce(void *arg)
    {
        int i;
        
        for ( ; ; ) {
            Sem_wait(&shared.nempty); //A
            Sem_wait(&shared.mutex);
            if (shared.nput >= nitems) {
                Sem_post(&shared.nstored); //重要!
                Sem_post(&shared.nempty); //A
                Sem_post(&shared.mutex);
                return NULL;
            }
            
            shared.buff[shared.nput % NBUFF] = shared.nputval;
            shared.nput++;
            shared.nputval++;
            Sem_post(&shared.mutex);
            Sem_post(&shared.nstored); //B
            *((int *)arg) += 1;
        }
        
    }
     
    void* consume(void *arg)
    {
        int i;
     
        for ( ; ; ) {
            Sem_wait(&shared.nstored); //B
            Sem_wait(&shared.mutex);
            if (shared.nget >= nitems) {
                Sem_post(&shared.nstored); //B
                Sem_post(&shared.mutex);
                return NULL;
            }
            
            i = shared.nget % NBUFF;
            if (shared.buff[i] != shared.ngetval)
                printf("error: buff[%d] = %d
    ", i, shared.buff[i]);
            shared.nget++;
            shared.ngetval++;
            Sem_post(&shared.mutex);
            Sem_post(&shared.nempty); //A
            *((int *)arg) += 1;
        }
        
    }

    信号量的限制

    1. 在<unistd,h>中定义,也可运行时通过sysconf获取
    2. SEM_NSEMS_MAX:一个进程可以最大打开的信号量数(posix要求至少为256)
    3. SEM_VALUE_MAX:一个信号量的最大值(posix至少要求为32767)

      可以用FIFO实现信号量也可以用内存映射I/O实现信号量。

  • 相关阅读:
    Oracle数据库的dual表的作用
    数据库中CASE函数和Oracle的DECODE函数的用法
    Oracle数据库中,通过function的方式建立自增字段
    Java学习(十三):抽象类和接口的区别,各自的优缺点
    Java学习(十八):二叉树的三种递归遍历
    Sublime工具插件安装
    sizeof和strlen
    I2C接口的EEPROM操作
    关于窗口看门狗
    关于指针传入函数
  • 原文地址:https://www.cnblogs.com/tianzeng/p/9314463.html
Copyright © 2011-2022 走看看