zoukankan      html  css  js  c++  java
  • Unix IPC之Posix信号量实现生产者消费者

    采用多生产者,多消费者模型。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
     * 生产者
     */
    P(nempty);
    P(mutex);
    // 写入一个空闲位置
    V(mutex);
    V(nstored);
     
    /**
     * 消费者
     */
    P(nstored);
    P(mutex):
    // 清空一个非空闲位置
    V(mutex);
    V(nempty);

    全局性说明:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    #include    "unpipc.h"
     
    #define NBUFF        10
    #define MAXNTHREADS 100
     
    int     nitems, nproducers, nconsumers;     /* read-only */
     
    struct      /* data shared by producers and consumers */
    {
        int   buff[NBUFF];
        int   nput;           /* item number: 0, 1, 2, ... */
        int   nputval;        /* value to store in buff[] */
        int   nget;           /* item number: 0, 1, 2, ... */
        int   ngetval;        /* value fetched from buff[] */
        sem_t mutex, nempty, nstored;     /* semaphores, not pointers */
    } shared;
     
    void    *produce(void *);
    void    *consume(void *);
    /* end globals */

    主函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    /* include main */
    int
    main(int argc, char **argv)
    {
        int     i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
        pthread_t   tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS]; 
     
        if (argc != 4)
            err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
        nitems = atoi(argv[1]);
        nproducers = min(atoi(argv[2]), MAXNTHREADS);
        nconsumers = min(atoi(argv[3]), MAXNTHREADS);
     
        /* 4initialize three semaphores */
        Sem_init(&shared.mutex, 0, 1);
        Sem_init(&shared.nempty, 0, NBUFF);
        Sem_init(&shared.nstored, 0, 0);
     
        /* 4create all producers and all consumers */
        Set_concurrency(nproducers + nconsumers);
        for (i = 0; i < nproducers; i++)
        {
            prodcount[i] = 0;
            Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
        }
        for (i = 0; i < nconsumers; i++)
        {
            conscount[i] = 0;
            Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
        }
     
        /* 4wait for all producers and all consumers */
        for (i = 0; i < nproducers; i++)
        {
            Pthread_join(tid_produce[i], NULL);
            printf("producer count[%d] = %d ", i, prodcount[i]);
        }
        for (i = 0; i < nconsumers; i++)
        {
            Pthread_join(tid_consume[i], NULL);
            printf("consumer count[%d] = %d ", i, conscount[i]);
        }
     
        Sem_destroy(&shared.mutex);
        Sem_destroy(&shared.nempty);
        Sem_destroy(&shared.nstored);
        exit(0);
    }
    /* end main */

    生产者线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /* include produce */
    void *
    produce(void *arg)
    {
        for ( ; ; )
        {
            Sem_wait(&shared.nempty);   /* wait for at least 1 empty slot */
            Sem_wait(&shared.mutex);
     
            if (shared.nput >= nitems)
            {
                Sem_post(&shared.nstored);  /* let consumers terminate */
                Sem_post(&shared.nempty);
                Sem_post(&shared.mutex);
                return(NULL);           /* all done */
            }
     
            shared.buff[shared.nput % NBUFF] = shared.nputval;
            shared.nput++;
            shared.nputval++;
     
            Sem_post(&shared.mutex);
            Sem_post(&shared.nstored);  /* 1 more stored item */
            *((int *) arg) += 1;
        }
    }
    /* end produce */

    消费者线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    /* include consume */
    void *
    consume(void *arg)
    {
        int     i;
     
        for ( ; ; )
        {
            Sem_wait(&shared.nstored);  /* wait for at least 1 stored item */
            Sem_wait(&shared.mutex);
     
            if (shared.nget >= nitems)
            {
                Sem_post(&shared.nstored);
                Sem_post(&shared.mutex);
                return(NULL);           /* all done */
            }
     
            i = shared.nget % NBUFF;
            if (shared.buff[i] != shared.ngetval)
                printf("error: buff[%d] = %d ", i, shared.buff[i]);
            shared.nget++;
            shared.ngetval++;
     
            Sem_post(&shared.mutex);
            Sem_post(&shared.nempty);   /* 1 more empty slot */
            *((int *) arg) += 1;
        }
    }
    /* end consume */
     





  • 相关阅读:
    安装Python及工具
    Python能做什么
    学习Python前序
    [摘]selenium-ide命令
    [摘]selenium-ide编辑命令
    selenium-ide学习
    敏捷个人课后练习:管理情绪
    敏捷个人课后练习:释放情绪
    敏捷个人课后练习:接纳情绪
    敏捷个人课后练习:承诺
  • 原文地址:https://www.cnblogs.com/fengkang1008/p/4737021.html
Copyright © 2011-2022 走看看