zoukankan      html  css  js  c++  java
  • 生产者/消费者问题的多种Java实现方式

    (1)wait() / notify()方法

    (2)await() / signal()方法

    (3)BlockingQueue阻塞队列方法

    (4)PipedInputStream / PipedOutputStream

    本文只介绍最常用的前三种,第四种暂不做讨论

    第一种:BlockingQueue阻塞队列方法

      1 class Task {
      2 
      3     private int id;
      4     private int value;
      5 
      6     public int getId() {
      7         return id;
      8     }
      9 
     10     public void setId(int id) {
     11         this.id = id;
     12     }
     13 
     14     public int getValue() {
     15         return value;
     16     }
     17 
     18     public void setValue(int value) {
     19         this.value = value;
     20     }
     21 }
     22 
     23 /**
     24  * 生产者
     25  */
     26 class Provider implements Runnable{
     27 
     28     private BlockingQueue<Task> queue;
     29 
     30     private static AtomicInteger newId = new AtomicInteger();
     31 
     32     private static Random random = new Random();
     33 
     34     private volatile boolean isRunning = true;
     35 
     36     public Provider(BlockingQueue<Task> queue) {
     37         this.queue = queue;
     38     }
     39 
     40     @Override
     41     public void run() {
     42         while (isRunning){
     43             try {
     44                 Thread.sleep(random.nextInt(1000));
     45                 int id = newId.incrementAndGet();
     46                 Task task = new Task();
     47                 task.setId(id);
     48                 task.setValue(random.nextInt(1000));
     49                 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
     50                 this.queue.put(task);
     51             } catch (Exception e){
     52                 e.printStackTrace();
     53             }
     54         }
     55     }
     56 
     57     public void stop() {
     58         this.isRunning = false;
     59     }
     60 }
     61 
     62 /**
     63  * 消费者
     64  */
     65 class Consumer implements Runnable{
     66 
     67     private BlockingQueue<Task> queue;
     68 
     69     public Consumer(BlockingQueue queue) {
     70         this.queue = queue;
     71     }
     72 
     73     @Override
     74     public void run() {
     75         while (true){
     76             try {
     77                 Task task = queue.take();
     78                 Thread.sleep(new Random().nextInt(1000));
     79                 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + task.getId());
     80             } catch (Exception e) {
     81                  e.printStackTrace();
     82             }
     83         }
     84 
     85     }
     86 }
     87 
     88 public static void main(String[] args) {
     89 
     90         ArrayBlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10);
     91         Provider provider = new Provider(queue);
     92         Provider provider1 = new Provider(queue);
     93         Provider provider2 = new Provider(queue);
     94 
     95         Consumer consumer = new Consumer(queue);
     96         Consumer consumer1 = new Consumer(queue);
     97         Consumer consumer2 = new Consumer(queue);
     98 
     99         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, Integer.MAX_VALUE,
    100                 60L, TimeUnit.SECONDS,
    101                 new ArrayBlockingQueue<Runnable>(12));
    102         executor.execute(provider);
    103         executor.execute(provider1);
    104         executor.execute(provider2);
    105         executor.execute(consumer);
    106         executor.execute(consumer1);
    107         executor.execute(consumer2);
    108 
    109 
    110         try {
    111             Thread.sleep(3000);
    112         } catch (Exception e) {
    113             e.printStackTrace();
    114         }
    115         provider.stop();
    116         provider1.stop();
    117         provider2.stop();
    118 
    119         try {
    120             Thread.sleep(2000);
    121         } catch (InterruptedException e) {
    122             e.printStackTrace();
    123         }
    124     }

    运行结果:

    当前线程:pool-1-thread-2, 获取了数据,id为:1, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-1, 获取了数据,id为:2, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-3, 获取了数据,id为:3, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-3, 获取了数据,id为:4, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-3, 获取了数据,id为:5, 进行装载到公共缓冲区中...
    当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 1
    当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 2
    当前线程:pool-1-thread-2, 获取了数据,id为:6, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-1, 获取了数据,id为:7, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-2, 获取了数据,id为:8, 进行装载到公共缓冲区中...
    当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 4
    当前线程:pool-1-thread-1, 获取了数据,id为:9, 进行装载到公共缓冲区中...
    当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 3
    当前线程:pool-1-thread-3, 获取了数据,id为:10, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-2, 获取了数据,id为:11, 进行装载到公共缓冲区中...
    当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 6
    当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 5
    当前线程:pool-1-thread-3, 获取了数据,id为:12, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-1, 获取了数据,id为:13, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-2, 获取了数据,id为:14, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-2, 获取了数据,id为:15, 进行装载到公共缓冲区中...
    当前线程:pool-1-thread-3, 获取了数据,id为:16, 进行装载到公共缓冲区中...
    当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 8
    当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 10
    当前线程:pool-1-thread-1, 获取了数据,id为:17, 进行装载到公共缓冲区中...
    当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 7
    当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 9
    当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 13
    当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 11
    当前线程:pool-1-thread-2, 获取了数据,id为:18, 进行装载到公共缓冲区中...
    当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 14
    当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 12
    当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 16
    当前消费线程:pool-1-thread-3, 消费成功,消费数据为id: 18
    当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 15
    当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 17

     第二种:wait() / notify()方法

      1 /**
      2  * 仓库
      3  */
      4 class Storage {
      5 
      6     private LinkedList list = new LinkedList(); // 存放产品的
      7 
      8     private final int MAX_NUM = 100;
      9 
     10 
     11     public void product(int num) {
     12 
     13         synchronized (list) {
     14 
     15             while (list.size() + num > MAX_NUM) {
     16                 try {
     17                     list.wait();
     18                 } catch (Exception e) {
     19                     e.printStackTrace();
     20                 }
     21                 System.out.println("库存达到最大,不能生产");
     22 
     23             }
     24 
     25             // 说明库存是够得,那么生产
     26             for (int i = 0; i < num; i++) {
     27                 list.add(new Object());
     28             }
     29 
     30             try {
     31                 Thread.sleep(2000);
     32             } catch (Exception e) {
     33                 e.printStackTrace();
     34             }
     35 
     36             System.out.println(Thread.currentThread().getName() + "生产完成,现在库存" + list.size());
     37 
     38             list.notifyAll();
     39 
     40         }
     41     }
     42 
     43     public void consume(int num) {
     44 
     45         synchronized (list) {
     46 
     47             while (list.size() < num) {
     48                 System.out.println("库存不足");
     49                 try {
     50                     list.wait();
     51                 } catch (Exception e) {
     52                     e.printStackTrace();
     53                 }
     54             }
     55 
     56             for (int i = 0; i < num; i++) {
     57                 list.remove();
     58             }
     59             try {
     60                 Thread.sleep(1000);
     61             } catch (Exception e) {
     62                 e.printStackTrace();
     63             }
     64 
     65             System.out.println(Thread.currentThread().getName() + "消费完成"+ num +",现在库存" + list.size());
     66             list.notifyAll();
     67         }
     68     }
     69 
     70     public LinkedList getList() {
     71         return list;
     72     }
     73 
     74     public void setList(LinkedList list) {
     75         this.list = list;
     76     }
     77 
     78 }
     79 
     80 class Producter1 implements Runnable {
     81 
     82     private int num;
     83     private Storage storage;
     84 
     85     public Producter1(int num, Storage storage) {
     86         this.num = num;
     87         this.storage = storage;
     88     }
     89 
     90     @Override
     91     public void run() {
     92         storage.product(num);
     93     }
     94 
     95     public int getNum() {
     96         return num;
     97     }
     98 
     99     public void setNum(int num) {
    100         this.num = num;
    101     }
    102 
    103     public Storage getStorage() {
    104         return storage;
    105     }
    106 
    107     public void setStorage(Storage storage) {
    108         this.storage = storage;
    109     }
    110 }
    111 
    112 class Consumer1 implements Runnable {
    113 
    114     private int num;
    115 
    116     private Storage storage;
    117 
    118     public Consumer1(int num, Storage storage) {
    119         this.num = num;
    120         this.storage = storage;
    121     }
    122 
    123     @Override
    124     public void run() {
    125         storage.consume(num);
    126     }
    127 
    128     public int getNum() {
    129         return num;
    130     }
    131 
    132     public void setNum(int num) {
    133         this.num = num;
    134     }
    135 
    136     public Storage getStorage() {
    137         return storage;
    138     }
    139 
    140     public void setStorage(Storage storage) {
    141         this.storage = storage;
    142     }
    143 }
    144 
    145 public static void main(String[] args) {
    146 
    147         Storage storage = new Storage();
    148 
    149         Producter1 producter = new Producter1(10,storage);
    150         Producter1 producter1 = new Producter1(10,storage);
    151         Producter1 producter2 = new Producter1(10,storage);
    152         Producter1 producter3 = new Producter1(10,storage);
    153         Producter1 producter4 = new Producter1(10,storage);
    154         Producter1 producter5 = new Producter1(10,storage);
    155         Producter1 producter6 = new Producter1(10,storage);
    156 
    157         Consumer1 consumer1 = new Consumer1(20,storage);
    158         Consumer1 consumer2 = new Consumer1(30,storage);
    159         Consumer1 consumer3 = new Consumer1(20,storage);
    160 
    161         ThreadPoolExecutor executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE,
    162                 60L, TimeUnit.SECONDS,
    163                 new ArrayBlockingQueue<Runnable>(12));
    164 
    165         executor.execute(producter);
    166         executor.execute(producter1);
    167         executor.execute(producter2);
    168         executor.execute(producter3);
    169         executor.execute(producter4);
    170         executor.execute(producter5);
    171         executor.execute(producter6);
    172         executor.execute(consumer1);
    173         executor.execute(consumer2);
    174         executor.execute(consumer3);
    175     }

    运行结果:

    pool-1-thread-1生产完成,现在库存10
    pool-1-thread-3生产完成,现在库存20
    pool-1-thread-7生产完成,现在库存30
    pool-1-thread-9消费完成30,现在库存0
    pool-1-thread-5生产完成,现在库存10
    库存不足
    pool-1-thread-6生产完成,现在库存20
    pool-1-thread-4生产完成,现在库存30
    pool-1-thread-10消费完成20,现在库存10
    pool-1-thread-2生产完成,现在库存20
    pool-1-thread-8消费完成20,现在库存0

     第三种:await() / signal()方法

    class Storage1 {
    
        private final int MAX_NUM = 100;
    
        private LinkedList list = new LinkedList();
    
        private final Lock lock = new ReentrantLock();
    
        private final Condition notEmpty = lock.newCondition();
    
        private final Condition notFull = lock.newCondition();
    
        public void product(int num) {
    
            lock.lock();
    
            try {
                while (list.size() + num > MAX_NUM) {
                    notFull.await();
                    System.out.println("库存达到最大,不能生产");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            // 说明库存是够得,那么生产
            for (int i = 0; i < num; i++) {
                list.add(new Object());
            }
    
            try {
               // Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName() + "生产完成,现在库存" + list.size());
            notFull.signalAll();
            notEmpty.signalAll();
    
            lock.unlock();
    
        }
    
        public void consume(int num) {
    
            lock.lock();
    
            try {
                while (list.size() < num) {
                    notEmpty.await();
                    System.out.println("库存不足");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            for (int i = 0; i < num; i++) {
                list.remove();
            }
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName() + "消费完成"+ num +",现在库存" + list.size());
    
            notFull.signalAll();
            notEmpty.signalAll();
    
            lock.unlock();
        }
    
    }
    class Producter2 implements Runnable {
    
        private int num;
        private Storage1 storage;
    
        public Producter2(int num, Storage1 storage) {
            this.num = num;
            this.storage = storage;
        }
    
        @Override
        public void run() {
            storage.product(num);
        }
    
        public int getNum() {
            return num;
        }
    
        public void setNum(int num) {
            this.num = num;
        }
    
        public Storage1 getStorage() {
            return storage;
        }
    
        public void setStorage(Storage1 storage) {
            this.storage = storage;
        }
    }
    
    class Consumer2 implements Runnable {
    
        private int num;
    
        private Storage1 storage;
    
        public Consumer2(int num, Storage1 storage) {
            this.num = num;
            this.storage = storage;
        }
    
        @Override
        public void run() {
            storage.consume(num);
        }
    
        public int getNum() {
            return num;
        }
    
        public void setNum(int num) {
            this.num = num;
        }
    
        public Storage1 getStorage() {
            return storage;
        }
    
        public void setStorage(Storage1 storage) {
            this.storage = storage;
        }
    }
    
    public static void main(String[] args) {
            Storage1 storage = new Storage1();
    
            Producter2 producter = new Producter2(10,storage);
            Producter2 producter1 = new Producter2(10,storage);
            Producter2 producter2 = new Producter2(10,storage);
            Producter2 producter3 = new Producter2(10,storage);
            Producter2 producter4 = new Producter2(10,storage);
            Producter2 producter5 = new Producter2(10,storage);
            Producter2 producter6 = new Producter2(10,storage);
    
            Consumer2 consumer1 = new Consumer2(20,storage);
            Consumer2 consumer2 = new Consumer2(30,storage);
            Consumer2 consumer3 = new Consumer2(20,storage);
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE,
                    60L, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(12));
    
            executor.execute(producter);
            executor.execute(producter1);
            executor.execute(producter2);
            executor.execute(producter3);
            executor.execute(producter4);
            executor.execute(producter5);
            executor.execute(producter6);
            executor.execute(consumer1);
            executor.execute(consumer2);
            executor.execute(consumer3);
        }

    运行结果:

    pool-1-thread-1生产完成,现在库存10
    pool-1-thread-3生产完成,现在库存20
    pool-1-thread-4生产完成,现在库存30
    pool-1-thread-5生产完成,现在库存40
    pool-1-thread-9消费完成30,现在库存10
    pool-1-thread-7生产完成,现在库存20
    pool-1-thread-8消费完成20,现在库存0
    pool-1-thread-2生产完成,现在库存10
    pool-1-thread-6生产完成,现在库存20
    pool-1-thread-10消费完成20,现在库存0

  • 相关阅读:
    Python类知识点
    安装psycopg2时出错:Error: pg_config executable not found.
    top命令
    Ubuntu18.10创建软件图标
    初始化Redis密码
    Ubuntu修改root密码,ssh 允许root用户登录
    Flask_Migrate数据库迁移
    Ubuntu18.04 systemd开机自启
    dnspython
    记一次Celery的仇
  • 原文地址:https://www.cnblogs.com/huxipeng/p/9130300.html
Copyright © 2011-2022 走看看