zoukankan      html  css  js  c++  java
  • 生产者消费者问题--进阶

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据

    一种实现如下:

    #include <stdio.h>
    #include <unistd.h>
    #include <pthread.h>
    #include <string.h>
    
    #define MAX 5 //缓冲区的的大小
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    
    typedef struct{
        char buffer[MAX];
        int count;
    }Buffer;
    
    Buffer share = {"", 0};
    char ch = 'A';
    
    void *producer(void *arg)
    {
        printf("Producer : starting 
    ");
        while(ch != 'K')
        {
            pthread_mutex_lock(&mutex);
            if(share.count != MAX)
            {
                 share.buffer[share.count++] = ch++;
                 printf("Producer: put char[%c]
    ", ch-1);
                 if(share.count == MAX)
                 {
                     printf("Producer: signaling full
    ");
                     pthread_cond_signal(&cond);//若果缓存满了发送信号
                 }
            }
            pthread_mutex_unlock(&mutex);
        }
        sleep(1);
        printf("Produce: Exiting 
    ");
        pthread_exit(NULL);
    }
    
    void *consumer(void *junk)
    {
        int i;
        printf("Consumer : starting
    ");
        while (ch != 'K')
        {
            pthread_mutex_lock(&mutex);
            printf("	 Consumer : Waiting
    ");
            while(share.count != MAX){
                pthread_cond_wait(&cond, &mutex);  //条件不成立释放锁.
                printf("Consumer wating for FULL signal
    ");
            }
            printf("Consumer : getting buffer :: ");
            for(i = 0; share.buffer[i] && share.count;++i, share.count--)
                putchar(share.buffer[i]);
            putchar('
    ');
            pthread_mutex_unlock(&mutex);
        }
    }
    
    int main()
    {
        pthread_t read, write;
        pthread_create(&read, NULL, (void *) consumer, NULL);
        pthread_create(&write, NULL, (void *)producer, NULL);
    
        pthread_join(read, NULL);
        pthread_join(write, NULL);
        return 0;
    }
    View Code

    修改consumer的代码:

    void *consumer(void *junk)
    {
        int i;
        printf("Consumer : starting
    ");
        while (ch != 'K')
        {
            pthread_mutex_lock(&mutex);
            printf("	 Consumer : Waiting
    ");
            //while(share.count != MAX){
                pthread_cond_wait(&cond, &mutex);  //条件不成立释放锁.
                printf("Consumer wating for FULL signal
    ");
            //}
            printf("Consumer : getting buffer :: ");
            for(i = 0; share.buffer[i] && share.count;++i, share.count--)
                putchar(share.buffer[i]);
            putchar('
    ');
            pthread_mutex_unlock(&mutex);
        }
    }

    编译运行会有两种结果:

    一种是:

    Consumer : starting
             Consumer : Waiting
    Producer : starting 
    Producer: put char[A]
    Producer: put char[B]
    Producer: put char[C]
    Producer: put char[D]
    Producer: put char[E]
    Producer: signaling full
    Consumer wating for FULL signal
    Consumer : getting buffer :: ABCDE
             Consumer : Waiting
    Producer: put char[F]
    Producer: put char[G]
    Producer: put char[H]
    Producer: put char[I]
    Producer: put char[J]
    Producer: signaling full
    Consumer wating for FULL signal
    Consumer : getting buffer :: FGHIJ
    Produce: Exiting 

    另一种是:

    Producer : starting 
    Producer: put char[A]
    Producer: put char[B]
    Producer: put char[C]
    Producer: put char[D]
    Producer: put char[E]
    Producer: signaling full
    Consumer : starting
         Consumer : Waiting

    可以看出来第二种是先执行了生产者,生产者填充满buffer之后,发送条件消息,但是此时consumer还没有执行,也并没有等待条件

    然后生产者释放锁,接着消费者获取锁,然后等待条件,由于缓冲已经满,

    if(share.count == MAX)

    生产者不会进入发送消息的代码,所以消费者一直等待条件.

    而上面一种的结果是因为,消费这先执行了,进入等待条件,所以没有这个问题,但是问题在于我们不能保证pthread_cond_wait()一定是先与pthread_cond_signal()执行的.

    但是通过加while()代码,就不会出现等待条件变量的问题,可以直接执行下面的代码.

    这个生产者消费者的解决方案是,同时只有一个生产者和消费者,是1:1的关系,生产者需填满缓冲区,才让消费者来取数据.下面一种实现是只要缓冲区不为空,消费者就来取数据,为空,就等待,缓冲区只要不满,生产者就生产数据,否则等待,支持n:n的关系.

    修改上面的代码:

    #include <stdio.h>
    #include <unistd.h>
    #include <pthread.h>
    #include <string.h>
    
    #define P_COUNT 5   //producer NO.
    #define C_COUNT 5   //NO.
    
    #define MAX 5 //缓冲区的的大小
    
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //锁住缓冲区
    /*队列满的时候,阻塞生产这线程,队列空时阻塞消费者线程*/
    pthread_cond_t notFull = PTHREAD_COND_INITIALIZER; //
    pthread_cond_t notEmpty = PTHREAD_COND_INITIALIZER;
    
    typedef struct{
        char buffer[MAX];
        int count;
    }Buffer;
    
    Buffer share = {"", 0};
    char ch = 'A';
    
    void *producer(void *arg)
    {
        int id = *(int *)arg;
        printf("[%d] Producer : starting 
    ", id);
        //while(ch != 'K')
        while(1)
        {
            pthread_mutex_lock(&mutex);
            while(share.count == MAX){
                pthread_cond_wait(&notFull, &mutex);
                printf("[%d] producer wating for not full signal
    ", id);
            }
            share.buffer[share.count++] = ch; //字符不能无限加超出范围了
            printf("[%d] Producer: put [%d] char[%c]
    ", id, share.count-1, ch );
            pthread_cond_signal(&notEmpty);
            pthread_mutex_unlock(&mutex);
            sleep(1);
        }
        sleep(1);
        printf("Produce: Exiting 
    ");
    }
    
    void *consumer(void *junk)
    {
        int id = *(int *)junk;
        printf("	[%d] Consumer : starting
    ", id);
        //while (ch != 'K')
        while (1)
        {
            pthread_mutex_lock(&mutex);
            printf("	 [%d] Consumer : Waiting
    ", id);
            while(share.count == 0){
                pthread_cond_wait(&notEmpty, &mutex);  //条件不成立释放锁.
                printf("	[%d] Consumer wating for not empty signal
    ", id);
            }
             //消费者取数据应该怎么取呢,这里是从数组的最右边开始取,也就是先取最新的数据
            printf("	[%d] Consumer : getting buffer[%d] :: [%c] 
    ", id, share.count, share.buffer[share.count-1]);
            share.count--;
            pthread_cond_signal(&notFull);
            pthread_mutex_unlock(&mutex);
            sleep(1);
        }
    }
    
    int main()
    {
        int i;
        pthread_t t_read[C_COUNT], t_write[P_COUNT];
        int *pId=(int *)malloc(sizeof(int)*P_COUNT);
        int *cId=(int *)malloc(sizeof(int)*C_COUNT);
        for(i = 0; i < P_COUNT; ++i){
            pId[i] = i;
            pthread_create(&t_write[i], NULL, (void *)producer, (void *)&pId[i]);
        }
        for(i = 0; i < C_COUNT; ++i){
            cId[i] = i;
            pthread_create(&t_read[i], NULL, (void *) consumer, (void *)&cId[i]);
        }
    
        for(i = 0; i < P_COUNT; ++i){
            pthread_join(t_read[i], NULL);
        }
        for(i = 0; i < C_COUNT; ++i){
            pthread_join(t_write[i], NULL);
        }
    
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&notFull);
        pthread_cond_destroy(&notEmpty);
        return 0;
    }
    View Code

    这里用到等待队列来实现同步,下面这种是采用信号量的方式来实现同步

    #include <unistd.h>
    #include <sys/types.h>
    #include <pthread.h>
    #include <semaphore.h>
    
    #include <stdlib.h>
    #include <stdio.h>
    #include <errno.h>
    #include <string.h>
    
    #define ERR_EXIT(m) 
            do 
            { 
                    perror(m); 
                    exit(EXIT_FAILURE); 
            } while(0)
    
    #define CONSUMERS_COUNT 1
    #define PRODUCERS_COUNT 1
    #define BUFFSIZE 10
    
    int g_buffer[BUFFSIZE];
    
    unsigned short in = 0;
    unsigned short out = 0;
    unsigned short produce_id = 0; //产品数不断累加
    unsigned short consume_id = 0;
    
    sem_t g_sem_full;
    sem_t g_sem_empty;
    pthread_mutex_t g_mutex;
    
    pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];
    
    void *consume(void *arg)
    {
        int i;
        int num = *(int*)arg;
        while (1)
        {
            printf("%d wait buffer not empty
    ", num);
            sem_wait(&g_sem_empty);
            pthread_mutex_lock(&g_mutex);
    
            for (i = 0; i < BUFFSIZE; i++)
            {
                printf("%02d ", i);
                if (g_buffer[i] == -1)
                    printf("%s", "null");
                else
                    printf("%d", g_buffer[i]);
    
                if (i == out)
                    printf("	<--consume");
    
                printf("
    ");
            }
            consume_id = g_buffer[out];
            printf("%d begin consume product %d
    ", num, consume_id);
            g_buffer[out] = -1;
            out = (out + 1) % BUFFSIZE;
            printf("out == %d 
    ", out);
            printf("%d end consume product %d
    ", num, consume_id);
            pthread_mutex_unlock(&g_mutex);
            sem_post(&g_sem_full);
            sleep(1);
        }
        return NULL;
    }
    
    void *produce(void *arg)
    {
        int num = *(int*)arg;
        int i;
        while (1)
        {
            printf("%d wait buffer not full
    ", num);
            sem_wait(&g_sem_full);
            pthread_mutex_lock(&g_mutex);
            for (i = 0; i < BUFFSIZE; i++)
            {
                printf("%02d ", i);
                if (g_buffer[i] == -1)
                    printf("%s", "null");
                else
                    printf("%d", g_buffer[i]);
    
                if (i == in)
                    printf("	<--produce");
    
                printf("
    ");
            }
    
            printf("%d begin produce product %d
    ", num, produce_id);
            g_buffer[in] = produce_id;
            in = (in + 1) % BUFFSIZE;
            printf("in == %d 
    ", in);
            printf("%d end produce product %d
    ", num, produce_id++);
            pthread_mutex_unlock(&g_mutex);
            sem_post(&g_sem_empty);
            sleep(5);
        }
        return NULL;
    }
    
    int main(void)
    {
        int i;
        for (i = 0; i < BUFFSIZE; i++)
            g_buffer[i] = -1;
    
        sem_init(&g_sem_full, 0, BUFFSIZE);
        sem_init(&g_sem_empty, 0, 0);
    
        pthread_mutex_init(&g_mutex, NULL);
    
    
        for (i = 0; i < CONSUMERS_COUNT; i++)
            pthread_create(&g_thread[i], NULL, consume, (void *)&i);
    
        for (i = 0; i < PRODUCERS_COUNT; i++)
            pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)&i);
    
        for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
            pthread_join(g_thread[i], NULL);
    
        sem_destroy(&g_sem_full);
        sem_destroy(&g_sem_empty);
        pthread_mutex_destroy(&g_mutex);
    
        return 0;
    }
    View Code

    consumer每次消费完会sleep(1), producer每次生产完会sleep(5),消费者等待的时间更长,故消费者会经常阻塞在sem_wait(&g_sem_empty) 上面,因为缓冲区经常为空,可以将PRODUCTORS_COUNT 改成5,即有5个生产者线程和1个消费者线程,而且生产者睡眠时间还是消费者的5倍,从动态输出可以看出,基本上就动态平衡了,即5个生产者一下子生产了5份东西,消费者1s消费1份,刚好在生产者继续生产前消费完.

    修改代码:

    #define CONSUMERS_COUNT 1
    #define PRODUCERS_COUNT 5

    使用Posix信号量可模拟互斥量和条件变量,而且通常更有优势。

    当函数sem_wait()和sem_post()用于线程内时,两个调用间的区域就是所要保护的临界区代码;当用于线程间时,则与条件变量等效。

    此外,信号量还可用作资源计数器,即初始化信号量的值作为某个资源当前可用的数量,使用时递减释放时递增。这样,原先一些保存队列状态的变量都不再需要。

    最后,内核会记录信号的存在,不会将信号丢失;而唤醒条件变量时若没有线程在等待该条件变量,信号将被丢失。

    有时候缓冲区也可以是没有限制大小的,修改第一份程序为缓冲区不限制大小的代码:

    #include <stdio.h>
    #include <unistd.h>
    #include <pthread.h>
    #include <string.h>
    
    #define P_COUNT 1   //producer NO.
    #define C_COUNT 2   //NO.
    
    #define MAX 5 //缓冲区的的大小
    
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    
    typedef struct{
        char buffer[MAX];
        int count;
    }Buffer;
    
    Buffer share = {"", 0};
    char ch = 'A';
    
    void *producer(void *arg)
    {
        printf("Producer : starting 
    ");
        while(1)
        {
            pthread_mutex_lock(&mutex);
            share.count++;
            printf("Producer: put charc count[%d]
    ", share.count);
            pthread_cond_signal(&cond);//若果缓存满了发送信号
            pthread_mutex_unlock(&mutex);
            sleep(1);
        }
        sleep(1);
        printf("Produce: Exiting 
    ");
        pthread_exit(NULL);
    }
    
    void *consumer(void *junk)
    {
        printf("Consumer : starting
    ");
        while (1)
        {
            pthread_mutex_lock(&mutex);
            printf("	 Consumer : Waiting
    ");
            while(share.count == 0){
                pthread_cond_wait(&cond, &mutex);
            }
            share.count--;
            printf("Consumer : getting buffer count[%d] 
    ", share.count);
            pthread_mutex_unlock(&mutex);
            sleep(1);
        }
    }
    
    int main()
    {
        int i;
        pthread_t t_read[C_COUNT], t_write[P_COUNT];
        for(i = 0; i < P_COUNT; ++i){
            pthread_create(&t_write[i], NULL, (void *)producer, NULL);
        }
        for(i = 0; i < C_COUNT; ++i){
            pthread_create(&t_read[i], NULL, (void *) consumer, NULL);
        }
    
        for(i = 0; i < P_COUNT; ++i){
            pthread_join(t_read[i], NULL);
        }
        for(i = 0; i < C_COUNT; ++i){
            pthread_join(t_write[i], NULL);
        }
    
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
        return 0;
    }
    View Code
  • 相关阅读:
    玩耍redis遇到的问题之记录
    哈勃望远镜--星柱图
    用js将从后台得到的时间戳(毫秒数)转换为想要的日期格式
    div水平居中
    hibernate和spring下载网址
    intellj idea 如何设置类头注释和方法注释(转载)
    转载:IT人高效的休息方式
    什么是REST?以及RESTful的实现
    easyui datagrid 获取记录数 页数 当前页
    font字体文件跨域
  • 原文地址:https://www.cnblogs.com/biglucky/p/4643465.html
Copyright © 2011-2022 走看看