zoukankan      html  css  js  c++  java
  • Consumer-Producer Problem

      This is the 4th assignment of EI328, in which we are required to simulate Consumer-Producer Problem with a multi-threading program. A main thread puts integers ranging from 1 to 100 into a bounded buffer, and meanwhile 5 working threads fetch integers from the buffer and print them on the terminal.

      Here is the source code that I submitted at the time:

      1 import java.util.concurrent.*;
      2 
      3 class BoundedQueue {
      4     private int size;
      5     private int front, rear;
      6     private int[] arr;
      7     
      8     public BoundedQueue (int size) {
      9         this.size = size+1;
     10         arr = new int[this.size];
     11     }
     12     public boolean isFull() {
     13         return (rear+1)%size==front;
     14     }
     15     public boolean isEmpty() {
     16         return rear==front;
     17     }
     18     public void add(int k) {
     19         rear = (rear+1)%size;
     20         arr[rear] = k;
     21     }
     22     public int poll() {
     23         front = (front+1)%size;
     24         return arr[front];
     25     }
     26 }
     27 
     28 
     29 class Producer extends Thread {
     30     public static BoundedQueue buffer;
     31     
     32     public Producer() {
     33         buffer = new BoundedQueue(10);
     34         start();
     35     }
     36     public void run() {
     37         try {
     38             int elem = 0;
     39             while (elem<100) {
     40                 Main.bufferMutex.acquire();
     41                 if (!buffer.isFull()) {
     42                     buffer.add(++elem);
     43                 }
     44                 Main.bufferMutex.release();
     45             }
     46             Main.flag = true;
     47         } catch (Exception e) {
     48             System.err.println("Error: "+e);
     49         }
     50     }
     51 }
     52 
     53 class Consumer extends Thread{
     54     private int idx;
     55     
     56     public Consumer(int idx) {
     57         this.idx = idx;
     58         start();
     59     }
     60     public void run() {
     61         int data = 0;
     62         try {
     63             while (true) {
     64                 // Extract a number from the buffer
     65                 Main.bufferMutex.acquire();
     66                 boolean empty = Producer.buffer.isEmpty();
     67                 if (!empty) {
     68                     data = Producer.buffer.poll();
     69                 } 
     70                 Main.bufferMutex.release();
     71                 if (!empty) {
     72                     // Print the number on the console
     73                     Main.consoleMutex.acquire();
     74                     System.out.print(this+":	");
     75                     System.out.println(data);
     76                     Main.consoleMutex.release();
     77                     // Work outside the critical section
     78                     TimeUnit.MILLISECONDS.sleep(100);
     79                 }
     80                 // Judge whether to quit
     81                 if (Main.flag && empty) {
     82                     break;
     83                 } 
     84             }
     85         } catch (Exception e) {
     86             System.err.println("Error: "+e);
     87         }
     88     }
     89     public String toString() {
     90         return "WorkThread_"+idx;
     91     }
     92 }
     93 
     94 public class Main {
     95     public static Semaphore bufferMutex;
     96     public static Semaphore consoleMutex;
     97     public static boolean flag;
     98     
     99     public static void main(String[] args) {
    100         bufferMutex = new Semaphore(1);
    101         consoleMutex = new Semaphore(1);
    102         Producer p = new Producer();
    103         Consumer[] c = new Consumer[5];
    104         for (int i=0;i<5;i++) {
    105             c[i] = new Consumer(i+1);
    106         }
    107         try {
    108             for (int i=0;i<5;i++) {
    109                 c[i].join();
    110             }
    111             p.join();
    112         } catch (Exception e) {
    113             System.err.println("Error: "+e);
    114         }
    115     }
    116 }

      Nevertheless, my program did not won TA's approval, for the reason that when the buffer is full or empty there will be busy waiting threads that waste CPU time. A good solution requires at least two more semaphores that indicate the buffer is full and empty respectively.

      However, simply adding two more semaphores full and empty may result in a dilemma where the producer has finished his work whereas all the consumers are still waiting besides an empty buffer. To tackle this problem, I use a sentinel as a message passed from the producer to consumers. When the producer has put all the products in the buffer, he will put into buffer one more special product. This product, as a sentinel, can be recognized by any consumer who happens to fetch it, and such a consumer will put it back into the buffer (as a message for his fellows) and then terminate the loop.

      Here is my revised solution to this problem, where I made a simple blocking queue as the bounded buffer.

      1 import java.util.concurrent.*;
      2 
      3 class BoundedQueue {
      4     private int[] arr;
      5     private int size;
      6     private int front, rear;
      7     private Semaphore mutex;
      8     private Semaphore full;
      9     private Semaphore empty;
     10     
     11     public BoundedQueue (int size) {
     12         this.size = size+1;
     13         arr = new int[this.size];
     14         mutex = new Semaphore(1);
     15         full = new Semaphore(1);
     16         empty = new Semaphore(0);
     17     }
     18     public boolean isFull() {
     19         return (rear+1)%size==front;
     20     }
     21     public boolean isEmpty() {
     22         return rear==front;
     23     }
     24     public void add(int k) {
     25         try {
     26             full.acquire();
     27             mutex.acquire();
     28             rear = (rear+1)%size;
     29             arr[rear] = k;
     30             mutex.release();
     31             empty.release();
     32         } catch (Exception e) {
     33             System.err.println("Error: "+e);
     34         }
     35     }
     36     public int poll() {
     37         int val = -1;
     38         try {
     39             empty.acquire();
     40             mutex.acquire();
     41             front = (front+1)%size;
     42             val = arr[front];
     43             mutex.release();
     44             full.release();
     45         } catch (Exception e) {
     46             System.err.println("Error: "+e);
     47         }
     48         return val;
     49     }
     50 }
     51 
     52 class Producer extends Thread {
     53     private BoundedQueue buffer;
     54     
     55     public Producer() {
     56         buffer = Main.buffer;
     57         start();
     58     }
     59     public void run() {
     60         try {
     61             int elem = 0;
     62             while (elem<100) {
     63                 buffer.add(++elem);
     64             }
     65             Thread.sleep(10);
     66             buffer.add(-1);
     67         } catch (Exception e) {
     68             System.err.println("Error 1: "+e);
     69         }
     70     }
     71 }
     72 
     73 class Consumer extends Thread{
     74     private BoundedQueue buffer;
     75     private int idx;
     76     
     77     public Consumer(int idx) {
     78         buffer = Main.buffer;
     79         this.idx = idx;
     80         start();
     81     }
     82     public void run() {
     83         int data = 0;
     84         try {
     85             while (true) {
     86                 data = buffer.poll();
     87                 if (data<0) {
     88                     buffer.add(data);
     89                     break;
     90                 } else {
     91                     Main.console.acquire();
     92                     System.out.print(this+":	");
     93                     System.out.println(data);
     94                     Main.console.release();
     95                 }
     96             }
     97         } catch (Exception e) {
     98             System.err.println("Error 2: "+e);
     99         }
    100     }
    101     public String toString() {
    102         return "WorkThread_"+idx;
    103     }
    104 }
    105 
    106 public class Main {
    107     public static BoundedQueue buffer;
    108     public static Semaphore console;
    109     
    110     public static void main(String[] args) {
    111         console = new Semaphore(1);
    112         buffer = new BoundedQueue(10);
    113         Producer p = new Producer();
    114         Consumer[] c = new Consumer[5];
    115         for (int i=0;i<5;i++) {
    116             c[i] = new Consumer(i+1);
    117         }
    118         try {
    119             p.join();
    120             for (int i=0;i<5;i++) {
    121                 c[i].join();
    122             }
    123         } catch (Exception e) {
    124             System.err.println("Error 0: "+e);
    125         }
    126     }
    127 }
  • 相关阅读:
    宏大的目标
    java tcp ip网络编程(二) 套接字的基本使用
    java socket编程(一)简介
    是么是 API 和 SDK
    Mac 下显示隐藏文件
    iOS-事务相关
    iOS测试一段代码的运行时间
    sqlite3 语句总结
    iOS-scrollview及其子类适配iOS7
    OAuth2.0授权和SSO授权
  • 原文地址:https://www.cnblogs.com/DevinZ/p/4427663.html
Copyright © 2011-2022 走看看