生产者消费者
生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。
转自 http://www.cnblogs.com/svitter/p/4006129.html
1 /*============================================================================= 2 # 3 # Author: svtter - svtter@qq.com 4 # 5 # QQ : 57180160 6 # 7 # Last modified: 2014-10-03 20:35 8 # 9 # Filename: producer_consumer.cc 10 # 11 # Description: 12 # 13 =============================================================================*/ 14 #include <cstdio> 15 #include <unistd.h> 16 #include <semaphore.h> 17 #include <pthread.h> 18 #include <sys/types.h> 19 #include <stdlib.h> 20 #include <iostream> 21 22 using namespace std; 23 24 #define N 5 25 #define item int 26 27 // P/V操作 28 void P(sem_t* sem) 29 { 30 if(sem_wait(sem)) 31 perror("P error!"); 32 33 } 34 35 void V(sem_t* sem) 36 { 37 if(sem_post(sem)) 38 perror("V error!"); 39 } 40 41 sem_t mutex; 42 sem_t full; 43 sem_t empty; 44 item buffer[N]; 45 int i = 0, j = -1; 46 47 void init_sem() 48 { 49 sem_init(&mutex, 0, 1); 50 sem_init(&full, 0, 0); 51 sem_init(&empty, 0, N); 52 } 53 54 void* producer(void *arg) 55 { 56 int product; 57 while(1) 58 { 59 //生成随机数字 60 product = rand()%100; 61 // cout << "producer running..." << endl; 62 P(&empty); 63 P(&mutex); 64 buffer[i] = product; 65 printf("producer produced %d @ %d pos ", 66 product, i); 67 i=(i+1)%N; 68 V(&mutex); 69 V(&full); 70 sleep(1); 71 } 72 } 73 74 void* consumer(void *arg) 75 { 76 int product, temp; 77 while(1) 78 { 79 // cout << "consumer running..." << endl; 80 P(&full); 81 P(&mutex); 82 j = (j+1)%N; 83 product = buffer[j]; 84 V(&mutex); 85 V(&empty); 86 printf("Consumer consumed %d @ %d pos ", 87 product, j); 88 sleep(3); 89 } 90 } 91 92 int main() 93 { 94 //random num 95 srand(time(NULL)); 96 init_sem(); 97 98 int error; 99 pthread_t producer_t, consumer_t; 100 101 error = pthread_create(&producer_t, NULL, producer, NULL); 102 if(error != 0) 103 printf("error in create producer. "); 104 else 105 printf("create producer success! "); 106 107 pthread_create(&consumer_t, NULL, consumer, NULL); 108 if(error != 0) 109 printf("error in create consumer. "); 110 else 111 printf("create consumer success! "); 112 113 pthread_join(producer_t, NULL); 114 pthread_join(consumer_t, NULL); 115 116 return 0; 117 }
Java:
class Producer implements Runnable {
private String producerName = null;
private StoreHouse storeHouse = null;
public Producer(String producerName, StoreHouse storeHouse) {
this.producerName = producerName;
this.storeHouse = storeHouse;
}
public void setProducerName(String producerName) {
this.producerName = producerName;
}
public String getProducerName() {
return producerName;
}
public void produceProduct() {
int i = 0;
while (true) {
i++;
Product pro = new Product(i);
storeHouse.push(pro);
System.out.println(getProducerName() + " 生产了 " + pro);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
return;
}
}
}
public void run() {
produceProduct();
}
}
class Consumer implements Runnable {
private String consumerName = null;
private StoreHouse storeHouse = null;
public Consumer(String consumerName, StoreHouse storeHouse) {
this.consumerName = consumerName;
this.storeHouse = storeHouse;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public String getConsumerName() {
return consumerName;
}
public void consumerProduct() {
while (true) {
System.out.println(getConsumerName() + " 消费了 " + storeHouse.pop());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
return;
}
}
}
public void run() {
consumerProduct();
}
}
class Product {
private int productId = 0;
public Product(int productId) {
this.productId = productId;
}
public int getProductId() {
return productId;
}
public String toString() {
return Integer.toString(productId);
}
}
class StoreHouse {
private int base = 0;
private int top = 0;
private Product[] products = new Product[10];
public synchronized void push(Product product) {
while (top == products.length) {
notify();
try {
System.out.println("仓库已满,正等待消费...");
wait();
} catch (InterruptedException e) {
System.out.println("stop push product because other reasons");
}
}
products[top] = product;
top++;
}
public synchronized Product pop() {
Product pro = null;
while (top == base) {
notify();
try {
System.out.println("仓库已空,正等待生产...");
wait();
} catch (InterruptedException e) {
System.out.println("stop push product because other reasons");
}
}
top--;
pro = products[top];
products[top] = null;
return pro;
}
}
public class TestPC {
public static void main(String[] args) {
StoreHouse storeHouse = new StoreHouse();
Producer producer = new Producer("生产者", storeHouse);
Consumer comsumer = new Consumer("消费者", storeHouse);
Thread t1 = new Thread(producer);
Thread t2 = new Thread(comsumer);
t1.start();
t2.start();
}
}
private String producerName = null;
private StoreHouse storeHouse = null;
public Producer(String producerName, StoreHouse storeHouse) {
this.producerName = producerName;
this.storeHouse = storeHouse;
}
public void setProducerName(String producerName) {
this.producerName = producerName;
}
public String getProducerName() {
return producerName;
}
public void produceProduct() {
int i = 0;
while (true) {
i++;
Product pro = new Product(i);
storeHouse.push(pro);
System.out.println(getProducerName() + " 生产了 " + pro);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
return;
}
}
}
public void run() {
produceProduct();
}
}
class Consumer implements Runnable {
private String consumerName = null;
private StoreHouse storeHouse = null;
public Consumer(String consumerName, StoreHouse storeHouse) {
this.consumerName = consumerName;
this.storeHouse = storeHouse;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public String getConsumerName() {
return consumerName;
}
public void consumerProduct() {
while (true) {
System.out.println(getConsumerName() + " 消费了 " + storeHouse.pop());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
return;
}
}
}
public void run() {
consumerProduct();
}
}
class Product {
private int productId = 0;
public Product(int productId) {
this.productId = productId;
}
public int getProductId() {
return productId;
}
public String toString() {
return Integer.toString(productId);
}
}
class StoreHouse {
private int base = 0;
private int top = 0;
private Product[] products = new Product[10];
public synchronized void push(Product product) {
while (top == products.length) {
notify();
try {
System.out.println("仓库已满,正等待消费...");
wait();
} catch (InterruptedException e) {
System.out.println("stop push product because other reasons");
}
}
products[top] = product;
top++;
}
public synchronized Product pop() {
Product pro = null;
while (top == base) {
notify();
try {
System.out.println("仓库已空,正等待生产...");
wait();
} catch (InterruptedException e) {
System.out.println("stop push product because other reasons");
}
}
top--;
pro = products[top];
products[top] = null;
return pro;
}
}
public class TestPC {
public static void main(String[] args) {
StoreHouse storeHouse = new StoreHouse();
Producer producer = new Producer("生产者", storeHouse);
Consumer comsumer = new Consumer("消费者", storeHouse);
Thread t1 = new Thread(producer);
Thread t2 = new Thread(comsumer);
t1.start();
t2.start();
}
}