zoukankan      html  css  js  c++  java
  • Posix信号量

    1、概述

      信号量(semaphore)是一种用于提供不同进程间或一个给定进程的不同线程间同步手段的原语。信号量的使用主要是用来保护共享资源,使得资源在一个时刻只有一个进程(线程)所拥有。信号量的值为正的时候,说明它空闲。所测试的线程可以锁定而使用它。若为0,说明它被占用,测试的线程要进入睡眠队列中,等待被唤醒。Posix信号量分为有名信号量和无名信号量(也叫基于内存的信号量)。

    2、Posix有名信号量

      有名信号量既可以用于线程间的同步也可以用于进程间的同步。

    1)由sem_open来创建一个新的信号量或打开一个已存在的信号量。其格式为:

    sem_t *sem_open(const char *name,int oflag,mode_t mode,unsigned int value);
    返回:若成功则为指向信号量的指针,若出错则为SEM_FAILED 其中,第三、四个参数可以没有,主要看第二个参数如何选取。
    oflag参数:可以是0、O_CREAT或O_CREAT|O_EXCL。如果指定O_CREAT标志而没有指定O_EXCL,那么只有当所需的信号量尚未存在时才初始化它。但是如果所需的信号量已经存在也不会出错。 但是如果在所需的信号量存在的情况下指定O_CREAT|O_EXCL却会报错。
    mode参数:指定权限位。
    value参数:指定信号量的初始值。该初始值不能超过SEM_VALUE_MAX(这个常值必须至少为32767).二值信号量的初始值通常为1,计数信号量的初始值则往往大于1。
    用sem_close来关闭该信号量。

    创建一个新的信号量程序如下:

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <semaphore.h>
     5 #include <errno.h>
     6 #include <fcntl.h>
     7 
     8 //创建模式权限
     9 #define FILE_MODE  (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
    10 
    11 int main(int argc,char *argv[])
    12 {
    13     int     c, flags;
    14     sem_t   *sem;
    15     unsigned int value;
    16     flags = O_RDWR | O_CREAT;
    17     value = 1; //初始化信号量的值为1,即二元信号量
    18     while((c = getopt(argc,argv,"ei:"))!= -1)
    19     {
    20         switch(c)
    21         {
    22             case 'e':
    23                 flags |= O_EXCL;
    24                 break;
    25             case 'i':
    26                 value = atoi(optarg);  //获取信号量的值
    27                 break;
    28         }
    29     }
    30     if(optind != argc -1)
    31     {
    32         printf("usage: semcreate [-e] [-i initalvalue] <name>");
    33         exit(0);
    34     }
    35     //创建信号量,返回sem_t类型指针
    36     if((sem = sem_open(argv[optind],flags,FILE_MODE,value)) == SEM_FAILED)
    37     {
    38         perror("sem_open() error");
    39         exit(-1);
    40     }
    41     //关闭打开的信号量
    42     sem_close(sem);
    43     exit(0);
    44 }

    2)使用sem_unlink删除信号量:
    int sem_unlink(const char *name); 返回:成功返回0,出错返回-1

    删除信号量程序如下:

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <semaphore.h>
     5 #include <errno.h>
     6 #include <fcntl.h>
     7 
     8 int main(int argc,char *argv[])
     9 {
    10     if(argc != 2)
    11     {
    12         printf("usage: semunlink<name>");
    13         exit(0);
    14     }
    15         //从系统中删除信号量
    16     if(sem_unlink(argv[1]) == -1)
    17     {
    18         perror("sem_unlink() error");
    19         exit(-1);
    20     }
    21     exit(0);
    22 }

    3)获取信号量的当前值:
    int sem_getvalue(sem_t *sem,int *valp); 返回:成功返回0,出错返回-1
    sem_getvalue在由valp指向的整数中返回所指定信号量的当前值。如果信号量当前已上锁,那么返回值或为0,或为某个负数,绝对值即为等待等待该信号量解锁的线程数。

    获取信号量的值程序如下:

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <semaphore.h>
     5 #include <errno.h>
     6 #include <fcntl.h>
     7 
     8 int main(int argc,char *argv[])
     9 {
    10     sem_t  *sem;
    11     int  val;
    12     if(argc != 2)
    13     {
    14         printf("usage: semgetvalue<name>");
    15         exit(0);
    16     }
    17     //打开一个已经存在的有名信号量
    18     sem = sem_open(argv[1],0);
    19      //获取信号量的值
    20     sem_getvalue(sem,&val);
    21     printf("value = %d\n",val);
    22     exit(0);
    23 }

    4)信号量的等待:(P操作,也称为递减down 或 上锁lock)
    int sem_wait(sem_t *sem);
    int sem_trywait(sem_t *sem);
    返回:成功返回0,出错返回-1
    sem_wait函数测试所指定信号量的值,如果该值大于0,就将它的值减1并立即返回;如果该值等于0,调用线程就被投入睡眠中,直到该值变为大于0,这时再将它减1,函数随后返回。“测试并减1”操作必须是原子的。sem_wait和sem_trywait的差别是:当所指定信号量的值已经是0时,后者并不将调用的进程投入睡眠。相反,它返回一个EAGAIN错误。如果被某个信号中断,sem_wait就可能过早的返回,返回的错误为EINTR。

    等待信号量程序如下:

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <semaphore.h>
     5 #include <errno.h>
     6 #include <fcntl.h>
     7 
     8 int main(int argc,char *argv[])
     9 {
    10     sem_t *sem;
    11     int val;
    12     if(argc != 2)
    13     {
    14         printf("usage: semwait<name>");
    15         exit(0);
    16     }
    17         //打开已经存在的信号量
    18     sem = sem_open(argv[1],0);
    19         //等待
    20     sem_wait(sem);
    21         //获取信号量的值
    22     sem_getvalue(sem,&val);
    23     printf("pid %ld has semaphore,value = %d\n",(long) getpid(),val);
    24     pause();
    25     exit(0);
    26 }

    5)信号量挂出(V操作,也称为递增up 或解锁unlock)
    int sem_post(sem_t *sem);返回:成功返回0,出错返回-1 将所指定的信号量值加1

    信号量挂出程序如下:

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <semaphore.h>
     5 #include <errno.h>
     6 #include <fcntl.h>
     7 
     8 int main(int argc,char *argv[])
     9 {
    10     sem_t     *sem;
    11     int     val;
    12     if(argc != 2)
    13     {
    14         printf("usage: semopt <name>");
    15         exit(0);
    16     }
    17     //打开已经存在的信号量
    18     sem = sem_open(argv[1],0);
    19     //信号量挂出
    20     sem_post(sem); 
    21     //获取挂出后的信号量值
    22     sem_getvalue(sem,&val);
    23     printf("value = %ld\n",val);
    24     exit(0);
    25 }

    在Centos上测试Posix信号量如下:

    3、采用Posix信号量实现生产者-消费者问题

      对生产者-消费者问题进行扩展,把共享缓冲区用作一个环绕缓冲区,即生产者填写最后一项后回头来填写第一项,消费者也这么操作。此时需要维持三个条件:

    (1)当缓冲区为空时,消费者不能试图从其中去除一个条目

    (2)当缓冲区填满时,生产者不能试图往其中放置一个条目

    (3)共享变量可能描述缓冲区的当前状态(下标、计数和链表指针),因此生产者和消费者的所有缓冲区操作都必须保护起来,以避免竞争。

    给出使用信号量的方案展示三种不同类型的信号量:

    (1)定义mutex二元信号量保护两个临界区。

    (2)定义nempty的计数信号量统计共享缓冲区中的空槽位数。

    (3)定义nstored的计数信号量统计共享缓冲区中已填写的槽位数。

    实现单个生产者和单个消费者的情况,程序如下所示:

    View Code
      1 include <stdlib.h>
      2 #include <unistd.h>
      3 #include <semaphore.h>
      4 #include <errno.h>
      5 #include <fcntl.h>
      6 //文件模式
      7 #define FILE_MODE  (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
      8 #define NBUFF             10      //槽位的个数
      9 #define SEM_MUTEX         "mutex1" 
     10 #define SEM_NEMPTY         "nemtpy1"
     11 #define SEM_NSTORED     "nstored1"
     12 
     13 int nitems;  //条目的个数
     14 //缓冲区结构
     15 struct  
     16 {
     17     int buff[NBUFF];   
     18     sem_t *mutex,*nempty,*nstored;   //信号量
     19 }shared;
     20 
     21 char *px_ipc_name(const char *name);
     22 void *produce(void *arg);
     23 void *consume(void *arg);
     24 
     25 int main(int argc,char *argv[])
     26 {
     27     pthread_t   tid_produce,tid_consume;
     28     if(argc != 2)
     29     {
     30         printf("usage: prodcons <#itmes>");
     31         exit(0);
     32     }
     33     nitems = atoi(argv[1]);  //获取条目数目
     34         //创建二元信号量
     35     if((shared.mutex = sem_open(SEM_MUTEX,O_CREAT,FILE_MODE,1)) == SEM_FAILED)
     36     {
     37         perror("sem_open() error");
     38         exit(-1);
     39     }
     40         //创建nempty信号量
     41     if((shared.nempty =  sem_open(SEM_NEMPTY,O_CREAT,FILE_MODE,NBUFF)) == SEM_FAILED)
     42     {
     43         perror("sem_open() error");
     44         exit(-1);
     45     }
     46         //创建nstored信号量
     47     if((shared.nstored = sem_open(SEM_NSTORED,O_CREAT,FILE_MODE,0)) == SEM_FAILED)
     48     {
     49         perror("sem_open() error");
     50         exit(-1);
     51     }
     52      pthread_setconcurrency(2);
     53         //生产者线程
     54     pthread_create(&tid_produce,NULL,produce,NULL);
     55         //消费者线程
     56     pthread_create(&tid_consume,NULL,consume,NULL);
     57     pthread_join(tid_produce,NULL);
     58     pthread_join(tid_consume,NULL);
     59     sem_unlink(SEM_MUTEX);
     60     sem_unlink(SEM_NEMPTY);
     61     sem_unlink(SEM_NSTORED);
     62     exit(0);
     63 }
     64 
     65 void *produce(void *arg)
     66 {
     67     int i;
     68     printf("produce is called.\n");
     69     for(i=0;i<nitems;i++)
     70     {
     71      //判断是否有空槽,有的将其减少1
     72          sem_wait(shared.nempty);
     73          //锁住槽位,对于多个生产者的时候有必要,单个生产者没有必要
     74       sem_wait(shared.mutex); 
     75       printf("produced a new item.\n");
     76       shared.buff[i%NBUFF] = i;
     77       sem_post(shared.mutex);  //释放锁
     78       sem_post(shared.nstored);  //缓冲区中条目数加1
     79     }
     80     return NULL;
     81 }
     82 
     83 void *consume(void *arg)
     84 {
     85     int   i;
     86     printf("consumer is called.\n");
     87     for(i=0;i<nitems;i++)
     88     {
     89         //判断缓冲区中是否有条目,有的话将条目数减少1
     90                 sem_wait(shared.nstored); 
     91                  //锁住缓冲区,对多个消费者有必要,对单个消费者没必要
     92         sem_wait(shared.mutex);
     93         if(shared.buff[i % NBUFF] != i)
     94             printf("buff[%d] = %d\n",i,shared.buff[i % NBUFF]);
     95         printf("removed a item.\n");
     96         sem_post(shared.mutex);  //释放锁
     97         sem_post(shared.nempty); //将缓冲区中的空槽数目加1
     98     }
     99     return NULL;
    100 }

    程序执行结果如下:

    result文件内容如下:生产者和消费者公用缓冲区。

    4、Posix基于内存的信号量

      Posix有名信号量创建时候是用一个name参数标识,它通常指代文件系统中的某个文件。而基于内存的信号量是由应用程序分配信号量的内存空间,即分配一个sem_t数据类型的内存空间,然后由系统初始化它们的值。操作函数如下:

    #include <semaphore.h>
    int sem_init(sem_t *sem, int pshared, unsigned int value);   //初始化内存信号量
    int sem_destroy(sem_t *sem);   //摧毁信号量

    如果shared=0,那么待初始化的信号量是在同一进程的各个线程间共享的,否则该信号量是在进程间共享的,此时该信号量必须存放在某种类型的共享内存区中,使得用它的进程能够访问该共享内存区。value是该信号量的初始值。

    现在采用基于内存的信号量实现生产者-消费者问题,单个生产者和单个消费者,程序如下所示:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <semaphore.h>
     5 #include <errno.h>
     6 
     7 #define NBUFF             10
     8 
     9 int nitems;
    10 //缓冲区结构
    11 struct
    12 {
    13     int buff[NBUFF];
    14     sem_t mutex,nempty,nstored;
    15 }shared;
    16 
    17 void *produce(void *arg);
    18 void *consume(void *arg);
    19 
    20 int main(int argc,char *argv[])
    21 {
    22     pthread_t   tid_produce,tid_consume;
    23     if(argc != 2)
    24     {
    25         printf("usage: prodcons <#itmes>");
    26         exit(0);
    27     }
    28     nitems = atoi(argv[1]);
    29         //创建基于内存的信号量
    30     if(sem_init(&shared.mutex,0,1) == -1)
    31     {
    32         perror("sem_open() error");
    33         exit(-1);
    34     }
    35     if(sem_init(&shared.nempty,0,NBUFF) == -1)
    36     {
    37         perror("sem_open() error");
    38         exit(-1);
    39     }
    40     if(sem_init(&shared.nstored,0,0) == -1)
    41     {
    42         perror("sem_open() error");
    43         exit(-1);
    44     }
    45      pthread_setconcurrency(2);
    46     pthread_create(&tid_produce,NULL,produce,NULL);
    47     pthread_create(&tid_consume,NULL,consume,NULL);
    48     pthread_join(tid_produce,NULL);
    49     pthread_join(tid_consume,NULL);
    50        //摧毁信号量
    51     sem_destroy(&shared.mutex);
    52     sem_destroy(&shared.nempty);
    53     sem_destroy(&shared.nstored);
    54     exit(0);
    55 }
    56 
    57 void *produce(void *arg)
    58 {
    59     int i;
    60     printf("produce is called.\n");
    61     for(i=0;i<nitems;i++)
    62     {
    63       sem_wait(&shared.nempty);
    64       sem_wait(&shared.mutex);
    65       printf("produced a new item.\n");
    66       shared.buff[i%NBUFF] = i;
    67       sem_post(&shared.mutex);
    68       sem_post(&shared.nstored);
    69     }
    70     return NULL;
    71 }
    72 
    73 void *consume(void *arg)
    74 {
    75     int   i;
    76     printf("consumer is called.\n");
    77     for(i=0;i<nitems;i++)
    78     {
    79         sem_wait(&shared.nstored);
    80         sem_wait(&shared.mutex);
    81         if(shared.buff[i % NBUFF] != i)
    82             printf("buff[%d] = %d\n",i,shared.buff[i % NBUFF]);
    83         printf("removed a item.\n");
    84         sem_post(&shared.mutex);
    85         sem_post(&shared.nempty);
    86     }
    87     return NULL;
    88 }

    程序执行结果与上面一致。

    5、多个生产者、单个消费者

      针对这种情况,不仅要考虑生产者与消费者之间的同步,而且还要考虑多个生产者之间的互斥。生产者中同时获取nempty信号量可以有多个,但是每个时刻只能有一个生产者能获取mutex信号量。修改缓冲区结构如下:

    struct
    {
        int         buff[NBUFF];  //缓冲区
        int         nput;      //待存入缓冲区下标
        int         nputval;    // 待存入的值
        sem_t     mutex,nempy,nstored;  //基于内存的信号量
    }shared;

    添加nput和nputval用于同步多个生产者线程。实现程序如下:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <semaphore.h>
     5 #include <errno.h>
     6 
     7 #define NBUFF             10
     8 #define MAXNTHREADS            100
     9 int nitems,nproducers;             //条目数和生产者线程数目
    10 struct
    11 {
    12     int buff[NBUFF];
    13     int nput;             
    14     int nputval;
    15     sem_t mutex,nempty,nstored;
    16 }shared;
    17 
    18 void *produce(void *arg);
    19 void *consume(void *arg);
    20 
    21 int main(int argc,char *argv[])
    22 {
    23     int i,count[MAXNTHREADS];
    24     pthread_t   tid_produce[MAXNTHREADS],tid_consume;
    25     if(argc != 3)
    26     {
    27         printf("usage: prodcons <#itmes> <#producers>");
    28         exit(0);
    29     }
    30     nitems = atoi(argv[1]);
    31     nproducers = atoi(argv[2]);
    32     if(sem_init(&shared.mutex,0,1) == -1)
    33     {
    34         perror("sem_open() error");
    35         exit(-1);
    36     }
    37     if(sem_init(&shared.nempty,0,NBUFF) == -1)
    38     {
    39         perror("sem_open() error");
    40         exit(-1);
    41     }
    42     if(sem_init(&shared.nstored,0,0) == -1)
    43     {
    44         perror("sem_open() error");
    45         exit(-1);
    46     }
    47      pthread_setconcurrency(nproducers+1);
    48         //创建多个生产者线程
    49     for(i=0;i<nproducers;i++)
    50     {
    51         count[i] = 0;
    52         pthread_create(&tid_produce[i],NULL,produce,&count[i]);
    53     }
    54     pthread_create(&tid_consume,NULL,consume,NULL);
    55     for(i=0;i<nproducers;i++)
    56     {
    57         pthread_join(tid_produce[i],NULL);
    58         printf("count[%d] = %d\n",i,count[i]);
    59     }
    60     pthread_join(tid_produce,NULL);
    61     pthread_join(tid_consume,NULL);
    62     sem_destroy(&shared.mutex);
    63     sem_destroy(&shared.nempty);
    64     sem_destroy(&shared.nstored);
    65     exit(0);
    66 }
    67 
    68 void *produce(void *arg)
    69 {
    70     int i;
    71     printf("produce is called.\n");
    72     for(;;)
    73     {
    74       sem_wait(&shared.nempty);
    75       sem_wait(&shared.mutex);
    76       if(shared.nput >= nitems)  //判断下标是否超出
    77       {
    78          sem_post(&shared.nempty);  //恢复empty的值
    79          sem_post(&shared.mutex);
    80          return NULL;
    81       }
    82       shared.buff[shared.nput%NBUFF] = shared.nputval;
    83       shared.nput++;
    84       shared.nputval++;
    85       sem_post(&shared.mutex);
    86       sem_post(&shared.nstored);
    87       *((int *)arg) += 1;
    88     }
    89     return NULL;
    90 }
    91 
    92 void *consume(void *arg)
    93 {

    程序测试结果如下:

    6、多个生产者、多个消费者

      这种情况需要考虑多个生产者之间的同步和多个消费者之间的同步,修改缓冲区结构如下所示:

    struct
    {
        int buff[NBUFF];
        int nput;     //生产者产生新条目的下标
        int nputval;
        int nget;      //消费者移除条目的下标
        int ngetval;
        sem_t mutex,nempty,nstored;
    }shared;

    实现程序如下:

    View Code
      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <unistd.h>
      4 #include <semaphore.h>
      5 #include <errno.h>
      6 
      7 #define NBUFF             10
      8 #define MAXNTHREADS           100
      9 int nitems,nproducers,nconsumers;
     10 struct
     11 {
     12     int buff[NBUFF];
     13     int nput;
     14     int nputval;
     15     int nget;
     16     int ngetval;
     17     sem_t mutex,nempty,nstored;
     18 }shared;
     19 
     20 void *produce(void *arg);
     21 void *consume(void *arg);
     22 
     23 int main(int argc,char *argv[])
     24 {
     25     int i,prodcount[MAXNTHREADS],conscount[MAXNTHREADS];
     26     pthread_t   tid_produce[MAXNTHREADS],tid_consume[MAXNTHREADS];
     27     if(argc != 4)
     28     {
     29         printf("usage: prodcons <#itmes> <#producers> <#consumers>");
     30         exit(0);
     31     }
     32     nitems = atoi(argv[1]);
     33     nproducers = atoi(argv[2]);
     34     nconsumers = atoi(argv[3]);
     35     if(sem_init(&shared.mutex,0,1) == -1)
     36     {
     37         perror("sem_open() error");
     38         exit(-1);
     39     }
     40     if(sem_init(&shared.nempty,0,NBUFF) == -1)
     41     {
     42         perror("sem_open() error");
     43         exit(-1);
     44     }
     45     if(sem_init(&shared.nstored,0,0) == -1)
     46     {
     47         perror("sem_open() error");
     48         exit(-1);
     49     }
     50      pthread_setconcurrency(nproducers+nconsumers);
     51     for(i=0;i<nproducers;i++)
     52     {
     53         prodcount[i] = 0;
     54         pthread_create(&tid_produce[i],NULL,produce,&prodcount[i]);
     55     }
     56     for(i=0;i<nconsumers;i++)
     57     {
     58          conscount[i] = 0;
     59         pthread_create(&tid_consume[i],NULL,consume,&conscount[i]);
     60     }
     61     for(i=0;i<nproducers;i++)
     62     {
     63         pthread_join(tid_produce[i],NULL);
     64         printf("producer count[%d] = %d\n",i,prodcount[i]);
     65     }
     66     for(i=0;i<nconsumers;i++)
     67     {
     68         pthread_join(tid_consume,NULL);
     69         printf("consumer count[%d] = %d\n",i,conscount[i]);
     70     }
     71     sem_destroy(&shared.mutex);
     72     sem_destroy(&shared.nempty);
     73     sem_destroy(&shared.nstored);
     74     exit(0);
     75 }
     76 
     77 void *produce(void *arg)
     78 {
     79     int i;
     80     printf("produce is called.\n");
     81     for(;;)
     82     {
     83       sem_wait(&shared.nempty);
     84       sem_wait(&shared.mutex);
     85       if(shared.nput >= nitems)
     86       {
     87          sem_post(&shared.nempty);
     88          sem_post(&shared.mutex);
     89          return NULL;
     90       }
     91       shared.buff[shared.nput%NBUFF] = shared.nputval;
     92       shared.nput++;
     93       shared.nputval++;
     94       sem_post(&shared.mutex);
     95       sem_post(&shared.nstored);
     96       *((int *)arg) += 1;
     97     }
     98     return NULL;
     99 }
    100 
    101 void *consume(void *arg)
    102 {
    103     int   i;
    104     printf("consumer is called.\n");
    105     for(;;)
    106     {
    107         sem_wait(&shared.nstored);
    108         sem_wait(&shared.mutex);
    109         if(shared.nget >= nitems)
    110         {
    111             sem_post(&shared.nstored);
    112             sem_post(&shared.mutex);
    113             return NULL;
    114         }
    115         i = shared.nget % NBUFF;
    116         if(shared.buff[i] != shared.ngetval)
    117             printf("error: buff[%d] = %d\n",i,shared.buff[i % NBUFF]);
    118         shared.nget++;
    119         shared.ngetval++;
    120         sem_post(&shared.mutex);
    121         sem_post(&shared.nempty);
    122         *((int*)arg) += 1;
    123     }
    124     return NULL;
    125 }

    测试结果如下:

  • 相关阅读:
    用Sqoop进行Hive和MySQL之间的数据互导
    Spark读HBase写MySQL
    Kafka如何彻底删除topic及数据
    LDAP-HA安装与配置(Keepalived方式实现)
    配置两个Hadoop集群Kerberos认证跨域互信
    MYSQL HA 部署手册
    ELK简单安装测试
    Elasticsearch CURL命令
    大数据常见错误解决方案(转载)
    生成 RSA 公钥和私钥的方法
  • 原文地址:https://www.cnblogs.com/Anker/p/2858765.html
Copyright © 2011-2022 走看看