zoukankan      html  css  js  c++  java
  • (转)OS: 生产者消费者问题(多进程+共享内存+信号量)

    转:http://blog.csdn.net/yaozhiyi/article/details/7561759

    一. 引子

    时隔一年再次用到 cout 的时候,哥潸然泪下,这是一种久别重逢的感动,虽然基本忘光了。趁着有大把时间,再把生产者消费者问题巩固一下,用纯C吧。珍惜能写代码的幸福时光。
     

    二. 分析

    生产者和消费者问题是多个相互合作的进程之间的一种抽象。生产者和消费者之间的关系:
    1.  对缓冲区的访问是互斥的。由于两者都会修改缓冲区,因此,一方修改缓冲区时,另一方不能修改,这就是互斥。
    2.  一方的行为影响另一方。缓冲区不空,才能消费,何时不空?生产了就不空;缓冲区满,就不能生产,何时不满?消费了就不满。这是同步关系。
    为了描述这种关系,一方面,使用共享内存代表缓冲区;另一方面,使用 互斥信号量 控制对缓冲区的访问,使用同步信号量描述两者的依赖关系。
     

    三. 共享存储

    共享存储是进程间通信的一种手段,通常,使用信号量同步或互斥访问共享存储。共享存储的原理是将进程的地址空间映射到一个共享存储段。在LINUX下,通过使用 shmget 函数创建或者获取共享内存。

    1. 创建

    1)不指定 KEY

    // IPC_PRIVATE指出需要创建内存; 
    //SHM_SIZE 指出字节大小; 
    //SHM_MODE 指出访问权限字如 0600表示,用户可以读写该内存
    int shmget(key_t IPC_PRIVATE,size_t SHM_SIZE,int SHM_MODE);

    2)指定KEY

    //如果SHM_KEY指向的共享存储已经存在,则返回共享存储的ID; 
    //否则,创建共享存储并返回其ID
    int  shmget(key_t SHM_KEY,size_t SHM_SIZE,int SHM_MODE);
     

    2. 访问

    方法一

    只需要共享存储的 ID 就可以通过  shmat  函数获得共享存储所占用的实际地址。因此,可以在父进程的栈中用变量存放指向共享存储的指针,那么 fork 之后,子进程就可以很方便地通过这个指针访问共享存储了。

    方法二

    如果进程之间并没有父子关系,但是协商好了共享存储的 KEY , 那么在每个进程中,就可以通过 KEY 以及 shmget 函数获得共享存储的 I D , 进而通过 shmat 函数获得共享存储的实际地址,最后访问。
     
    在我的实现中,我把生产者实现为父进程,消费者实现为子进程,并通过方法一实现进程之间共享内存。

    四. 信号量集

    信号量有两种原语 P 和 V ,P 锁定资源,V 释放资源。LINUX 下的使用信号量集合的接口特别复杂。我所用到的函数如下:

    1. 创建或者获取信号量集合

    // IPC_PRIVATE 表示创建信号量集, NUM_OF_SEM表示该集合中有多少信号量; FLAGS复杂不追究
    semget(IPC_PRIVATE, NUM_OF_SEM, FLAGS );
    // SEM_KEY 是 key_t 类型
    //如果 SEM_KEY 代表的信号量集存在,则返回信号量集的ID
    //如果不存在,则创建信号量集并返回ID
    semget(SEM_KEY, NUM_OF_SEM,FLAGS);
     

    2. 初始化信号量

    创建的过程并未指定信号量的初始值,需要使用 semctl 函数指定。
    semctl(int semSetId , int semIdx , int cmd, union semun su);
     
    其中 semSetId 是指信号量集的 ID , semIdx 指信号量集中某个信号量的索引(从零开始), 如果是要设置信号量的值, 填 SETVAL 即可, 为了设置信号量的值,可以指定su.val为索要设置的值。
    我在 UBUNTU 下使用 union semun 编译时总报错:
    invalid use of undefined type ‘union semun’
     
    据说是 Linux 下删除了 semun 的定义。可以通过自定义 semun 解决:
     
    #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
    /*   union   semun   is   defined   by   including   <sys/sem.h>   */ 
    #else 
    /*   according   to   X/OPEN   we   have   to   define   it   ourselves   */ 
    union semun{
        int val;
        struct semid_ds *buf;
        unsigned short *array;
    };
    #endif

    五. 代码分解

    1. 头文件

    #include "stdio.h"   //支持 printf
    #include <sys/shm.h> //支持 shmget shmat 等
    #include <sys/sem.h> //支持 semget 
    #include <stdlib.h>  //支持 exit

    2. 信号量

    共需要三个信号量:
    第一个信号量用于限制生产者必须在缓冲区不满时才能生产,是同步信号量
    第二个信号量用于限制消费者必须在缓冲区有产品时才消费,是同步信号量
    第三个信号量用于限制生产者和消费者在访问缓冲区时必须互斥,是互斥信号量
     
    创建信号量集合,semget
    if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
    {
        perror("create semaphore failed");
        exit(1);
    }


    初始化三个信号量,semctl,需要用到 union semun 

    union semun su;
    
    //信号量初始化,其中 su 表示 union semun 
    su.val = N_BUFFER;//当前库房还可以接收多少产品
    if(semctl(semSetId,0,SETVAL, su) < 0){
        perror("semctl failed");
        exit(1);
    }
    su.val = 0;//当前没有产品
    if(semctl(semSetId,1,SETVAL,su) < 0){
        perror("semctl failed");
        exit(1);
    }
    su.val = 1;//为1时可以进入缓冲区
    if(semctl(semSetId,2,SETVAL,su) < 0){
        perror("semctl failed");
        exit(1);
    }
    封装对信号量集中的某个信号量的值的+1或者-1操作
    //semSetId 表示信号量集合的 id
    //semNum 表示要处理的信号量在信号量集合中的索引
    void waitSem(int semSetId,int semNum)
    {
        struct sembuf sb;
        sb.sem_num = semNum;
        sb.sem_op = -1;//表示要把信号量减一
        sb.sem_flg = SEM_UNDO;//
        //第二个参数是 sembuf [] 类型的,表示数组
        //第三个参数表示 第二个参数代表的数组的大小
        if(semop(semSetId,&sb,1) < 0){
            perror("waitSem failed");
            exit(1);
        }
    }
    void sigSem(int semSetId,int semNum)
    {
        struct sembuf sb;
        sb.sem_num = semNum;
        sb.sem_op = 1;
        sb.sem_flg = SEM_UNDO;
        //第二个参数是 sembuf [] 类型的,表示数组
        //第三个参数表示 第二个参数代表的数组的大小
        if(semop(semSetId,&sb,1) < 0){
            perror("waitSem failed");
            exit(1);
        }
    }

    3. 使用共享内存

    //把共享存储区域中的内容用结构体封装起来
    struct ShM{
        int start;
        int end;
    }* pSM;
    
    
    //缓冲区分配以及初始化
    if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
    {
        perror("create shared memory failed");
        exit(1);
    }
    //shmat返回void*指针需要强制转化类型
    pSM = (struct ShM *)shmat(shmId,0,0);
    //初始化工作
    pSM->start = 0;
    pSM->end = 0;

    4. 生产过程

    while(1)
    {
        waitSem(semSetId,0);//获取一个空间用于存放产品
        waitSem(semSetId,2);//占有产品缓冲区
        produce();
        sigSem(semSetId,2);//释放产品缓冲区
        sleep(1);//每两秒生产一个
        sigSem(semSetId,1);//告知消费者有产品了
    }

    5. 消费过程

    while(1)
    {
        waitSem(semSetId,1);//必须有产品才能消费
        waitSem(semSetId,2);//锁定缓冲区
        consume();//获得产品,需要修改缓冲区
        sigSem(semSetId,2);//释放缓冲区
        sigSem(semSetId,0);//告知生产者,有空间了
        sleep(2);//消费频率
    }

    六. 代码全文

    #include "stdio.h"
    #include <sys/shm.h>
    #include <sys/sem.h>
    #include <stdlib.h>
    #define SHM_SIZE (1024*1024)
    #define SHM_MODE 0600
    #define SEM_MODE 0600
    
    #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
    /*   union   semun   is   defined   by   including   <sys/sem.h>   */ 
    #else 
    /*   according   to   X/OPEN   we   have   to   define   it   ourselves   */ 
    union semun{
        int val;
        struct semid_ds *buf;
        unsigned short *array;
    };
    #endif
    
    struct ShM{
        int start;
        int end;
    }* pSM;
    
    const int N_CONSUMER = 3;//消费者数量
    const int N_BUFFER = 5;//缓冲区容量
    int shmId = -1,semSetId=-1;
    union semun su;//sem union,用于初始化信号量
    
    //semSetId 表示信号量集合的 id
    //semNum 表示要处理的信号量在信号量集合中的索引
    void waitSem(int semSetId,int semNum)
    {
        struct sembuf sb;
        sb.sem_num = semNum;
        sb.sem_op = -1;//表示要把信号量减一
        sb.sem_flg = SEM_UNDO;//
        //第二个参数是 sembuf [] 类型的,表示数组
        //第三个参数表示 第二个参数代表的数组的大小
        if(semop(semSetId,&sb,1) < 0){
            perror("waitSem failed");
            exit(1);
        }
    }
    void sigSem(int semSetId,int semNum)
    {
        struct sembuf sb;
        sb.sem_num = semNum;
        sb.sem_op = 1;
        sb.sem_flg = SEM_UNDO;
        //第二个参数是 sembuf [] 类型的,表示数组
        //第三个参数表示 第二个参数代表的数组的大小
        if(semop(semSetId,&sb,1) < 0){
            perror("waitSem failed");
            exit(1);
        }
    }
    //必须在保证互斥以及缓冲区不满的情况下调用
    void produce()
    {
        int last = pSM->end;
        pSM->end = (pSM->end+1) % N_BUFFER;
        printf("生产 %d
    ",last);
    }
    //必须在保证互斥以及缓冲区不空的情况下调用
    void consume()
    {
        int last = pSM->start;
        pSM->start = (pSM->start + 1)%N_BUFFER;
        printf("消耗 %d
    ",last);
    }
    
    void init()
    {
        //缓冲区分配以及初始化
        if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
        {
            perror("create shared memory failed");
            exit(1);
        }
        pSM = (struct ShM *)shmat(shmId,0,0);
        pSM->start = 0;
        pSM->end = 0;
        
        //信号量创建
        //第一个:同步信号量,表示先后顺序,必须有空间才能生产
        //第二个:同步信号量,表示先后顺序,必须有产品才能消费
        //第三个:互斥信号量,生产者和每个消费者不能同时进入缓冲区
    
        if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
        {
            perror("create semaphore failed");
            exit(1);
        }
        //信号量初始化,其中 su 表示 union semun 
        su.val = N_BUFFER;//当前库房还可以接收多少产品
        if(semctl(semSetId,0,SETVAL, su) < 0){
            perror("semctl failed");
            exit(1);
        }
        su.val = 0;//当前没有产品
        if(semctl(semSetId,1,SETVAL,su) < 0){
            perror("semctl failed");
            exit(1);
        }
        su.val = 1;//为1时可以进入缓冲区
        if(semctl(semSetId,2,SETVAL,su) < 0){
            perror("semctl failed");
            exit(1);
        }
    }
    int main()
    {
        int i = 0,child = -1;
        init();
        //创建 多个(N_CONSUMER)消费者子进程
        for(i = 0; i < N_CONSUMER; i++)
        {
            if((child = fork()) < 0)//调用fork失败
            {
                perror("the fork failed");
                exit(1);
            }
            else if(child == 0)//子进程
            {
                printf("我是第 %d 个消费者子进程,PID = %d
    ",i,getpid());
                while(1)
                {
                    waitSem(semSetId,1);//必须有产品才能消费
                    waitSem(semSetId,2);//锁定缓冲区
                    consume();//获得产品,需要修改缓冲区
                    sigSem(semSetId,2);//释放缓冲区
                    sigSem(semSetId,0);//告知生产者,有空间了
                    sleep(2);//消费频率
                }
                break;//务必有
            }
        }
        
        
        //父进程开始生产
        if(child > 0)
        {
            while(1)
            {
                waitSem(semSetId,0);//获取一个空间用于存放产品
                waitSem(semSetId,2);//占有产品缓冲区
                produce();
                sigSem(semSetId,2);//释放产品缓冲区
                sleep(1);//每两秒生产一个
                sigSem(semSetId,1);//告知消费者有产品了
            }
        }
        return 0;
    }


     

  • 相关阅读:
    服务器性能调优(netstat监控大量ESTABLISHED连接与Time_Wait连接问题)
    maven/gredle配置阿里云仓库镜像,加速下载maven依赖
    Mac文件上传下载到服务器指定命令
    mysql-管理命令【创建用户、授权、修改密码、删除用户和授权、忘记root密码】
    springmvc返回不带引号的字符串
    maven过滤配置文件
    git常用命令/git 部分高级命令备忘录
    SpringBoot中使用Fastjson/Jackson对JSON序列化格式化输出的若干问题
    修改mysql配置中my.conf中max_allowed_packet变量
    ZK安装、ZK配置、ZK集群部署踩过的大坑
  • 原文地址:https://www.cnblogs.com/wangle1001986/p/3234668.html
Copyright © 2011-2022 走看看