http://wenku.baidu.com/view/b602a1beb9d528ea81c77986.html?re=view
#include<stdio.h>
#include<malloc.h>
#include<pthread.h>
#include<semaphore.h>
#define BUFFER_SIZE 30
#define OVER (-1)
struct Product
{
int tid;
int data;
};
struct producers
{
//定义生产者条件变量结构
struct Product buffer[BUFFER_SIZE];//缓冲区
sem_t sem_read; // 读信号量
sem_t sem_write; // 写信号量
pthread_mutex_t wlock; // 缓冲区写锁
pthread_mutex_t rlock; // 缓冲区读锁
pthread_mutex_t lock; // thread_count的读写锁
int readpos , writepos;//读写位置
};
struct producers buffer;
int thread_count = 30; //存活生产者计数
//用于在线程内部标识线程ID
int ids[30] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30};
int count = 0; // 计数消费产品数量
int pcount = 0; // 计数生产产品数量
void init()
{
//初始化相关锁和变量以及信号量
buffer.readpos = 0;
buffer.writepos = 0;
//写信号量比缓冲区小1,防止缓冲区满和缓冲区空分不清
sem_init(&buffer.sem_write, 0, BUFFER_SIZE-1);
sem_init(&buffer.sem_read, 0, 0);
pthread_mutex_init(&buffer.wlock, NULL);
pthread_mutex_init(&buffer.rlock, NULL);
pthread_mutex_init(&buffer.lock, NULL);
}
void put(int tid, int data)
{
//缓冲区中放入一个数据
sem_wait(&buffer.sem_write);
//生产前先加锁,已防止其他线程同时生产
pthread_mutex_lock(&buffer.wlock);
buffer.buffer[buffer.writepos].tid = tid;
buffer.buffer[buffer.writepos].data = data;
buffer.writepos ++;
++ pcount;
if( buffer.writepos >= BUFFER_SIZE )
buffer.writepos = 0;
pthread_mutex_unlock(&buffer.wlock);
sem_post(&buffer.sem_read);
}
//读数据并移走
struct Product * get(int tid)
{
struct Product * produce = NULL;
//消费前先上锁,以防止其他线程同时消费
pthread_mutex_lock(&buffer.rlock);
// 如果生产者线程没有全部退出,或者缓冲区内仍有产品,则说明可以尝试去获取产品
if(thread_count > 0 || buffer.readpos != buffer.writepos)
{
//从信号量的值减去一个“1”,但它永远会先等待该信号量为一个非零值才开始做减法
sem_wait( &buffer.sem_read );
int pos = buffer.readpos;
// 在已有产品中迭代,查找适合的产品
while(pos != buffer.writepos)
{
int id = buffer.buffer[pos].tid;
if(id >10 && id <=20 && tid == id)
{ //如果产品是的生产者线程号 10<id<=20 则可以供和它的线程号相同的消费者线程消费
produce = (struct Product *)malloc(sizeof(struct Product));
produce->tid = id;
produce->data = buffer.buffer[pos].data;
break;
}
else if(id <= 10 && (id%2 == tid%2))
{ //如果产品是的生产者线程号 <=10 则可以供和它的线程号奇偶性相同的消费者消费
produce = (struct Product *)malloc(sizeof(struct Product));
produce->tid = id;
produce->data = buffer.buffer[pos].data;
break;
}
else if(id > 20)
{ //如果产品是的生产者线程号>20则可以供任何消费者消费
produce = (struct Product *)malloc(sizeof(struct Product));
produce->tid = id;
produce->data = buffer.buffer[pos].data;
break;
}
pos = (pos+1)%BUFFER_SIZE;
}
if( produce )
{ //如果取得了产品,消费计数+1,将在队头的元素覆盖到被取走的位置
++ count;
buffer.buffer[pos].tid = buffer.buffer[buffer.readpos].tid;
buffer.buffer[pos].data = buffer.buffer[buffer.readpos].data;
++ buffer.readpos;
}
if( buffer.readpos >= BUFFER_SIZE )
buffer.readpos = 0;
//如果取得了产品则释放一个缓冲区可写的信号量,否则释放一个可读的信号量
if( produce )
sem_post(&buffer.sem_write);
else
sem_post(&buffer.sem_read);
pthread_mutex_unlock(&buffer.rlock);
}else
pthread_mutex_unlock(&buffer.rlock);
return produce;
}
void *producer(void *data)
{ //每个线程循环生产30个产品
int tid = *((int *)data);
int n;
for(n = 1; n <= 30; n++)
{
printf("producer %d product %d
", tid, n);
put(tid, n);
}
// 每退出一个生产者线程后,thread_count 减1
pthread_mutex_lock(&buffer.lock);
-- thread_count;
pthread_mutex_unlock(&buffer.lock);
return NULL;
}
void *consumer(void * data)
{
int tid = *((int *)data);
struct Product *d = NULL;
while(1)
{
d = get(tid);
if( d ) {
printf("consumer %d consum %d from Producer %d
", tid, d->data,d->tid);
free(d);
}
pthread_mutex_lock(&buffer.lock);
// 当所有生产者线程已退出,且缓冲区内已没有该线程可消费的产品时,退出该线程
if(d == NULL && thread_count == 0 )
{
pthread_mutex_unlock(&buffer.lock);
break;
}else
pthread_mutex_unlock(&buffer.lock);
}
return NULL;
}
int main(void)
{
pthread_t th_a[30], th_b[30];
void *retval;
init(&buffer);
int i;
for(i = 0; i<30; ++i)
{ // 创建生产者和消费者
pthread_create(&th_a[i], NULL, producer, &ids[i]);
pthread_create(&th_b[i], NULL, consumer, &ids[i]);
}
for(i = 0; i<30; ++i)
{ // 将线程加入到主线程组
pthread_join(th_a[i], &retval);
pthread_join(th_b[i], &retval);
}
printf("product %d products
", pcount);
printf("consume %d products
", count);
return 0;
}