zoukankan      html  css  js  c++  java
  • 生产者消费者JAVA实现

    三种实现方式:

    1. Object对象的wait(),notify(),加synchronize.

    2. Lock的await(),signal().

    3. BlockingQueue阻塞队列.

    Object对象的wait(),notify(),加synchronize --> StorageObject();

    Lock的await(),signal() --> StorageLock();

    BlockingQueue阻塞队列 --> StorageBlockingQueue();

    package concurent;

    import java.util.LinkedList;
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;

    /**
    * Created by jenkin K on 17/6/8.
    */
    public class PCModelTread {

    public static void main(String args[]){

    // Storage storage = new StorageObject();
    Storage storage = new StorageLock();
    // Storage storage = new StorageBlockingQueue();

    Producer p1 = new Producer(storage);
    Producer p2 = new Producer(storage);
    Producer p3 = new Producer(storage);
    Producer p4 = new Producer(storage);

    Consumer c1 = new Consumer(storage);
    Consumer c2 = new Consumer(storage);

    p1.setNum(6);
    p2.setNum(1);
    p3.setNum(1);
    p4.setNum(8);

    c1.setNum(1);
    c2.setNum(5);

    new Thread(p1, "producer1").start();
    new Thread(p2, "producer2").start();
    new Thread(p3, "producer3").start();
    new Thread(p4, "producer4").start();

    new Thread(c1, "consumer1").start();
    new Thread(c2, "consumer2").start();
    }
    }

    interface Storage {
    public void consume(int num);
    public void produce(int num);
    }

    class StorageObject implements Storage {
    final int MAX_STORAGE = 10;
    private LinkedList<Object> list = new LinkedList<Object>();

    public void produce(int num){
    synchronized (list){
    while(num + list.size() > MAX_STORAGE){
    System.out.println("库存:["+ list.size() + "]/["+ MAX_STORAGE +"], 待生产数量:[" + num + "], 生产阻塞:" + Thread.currentThread().getName() + "阻塞.");
    try{
    list.wait();
    }catch (InterruptedException e){
    e.printStackTrace();
    }
    }
    for(int i = 0; i < num; i++){
    list.add(new Object());
    System.out.println("库存:["+ list.size() + "]/[" + MAX_STORAGE + "], " + Thread.currentThread().getName() + "正在生产:[" + (i+1) + "].");
    }
    System.out.println(Thread.currentThread().getName() + "生产完成, 共生产:" + num + ", 库存:["+ list.size() + "]/["+ MAX_STORAGE +"].");
    list.notifyAll();
    }
    }

    public void consume(int num){
    synchronized (list){
    while(list.size() < num){
    System.out.println("库存:["+ list.size() + "]/[" + MAX_STORAGE + "], 待消费数量:[" + num + "], 消费阻塞:" + Thread.currentThread().getName() + "阻塞.");
    try{
    list.wait();
    }catch (InterruptedException e){
    e.printStackTrace();
    }
    }
    for(int i = 0; i < num; i++){
    System.out.println("库存:["+ list.size() + "]/[" + MAX_STORAGE + "], " + Thread.currentThread().getName() + "正在消费:[" + (i+1) + "].");
    list.remove();
    }
    System.out.println(Thread.currentThread().getName() + "消费完成, 共消费:" + num + ", 库存:["+ list.size() + "]/["+ MAX_STORAGE + "].");
    list.notifyAll();
    }
    }
    }

    class StorageLock implements Storage {
    final int MAX_STORAGE = 10;
    LinkedList<Object> list = new LinkedList<Object>();
    Lock lock = new ReentrantLock(false);
    Condition produce = lock.newCondition();
    Condition consume = lock.newCondition();

    @Override
    public void produce(int num) {
    lock.lock();
    try {
    while (num + list.size() > MAX_STORAGE) {
    System.out.println("库存:[" + list.size() + "]/[" + MAX_STORAGE + "], 待生产数量:[" + num + "], 生产阻塞:" + Thread.currentThread().getName() + "阻塞.");
    produce.await();
    }
    for (int i = 0; i < num; i++) {
    list.add(new Object());
    System.out.println("库存:[" + list.size() + "]/[" + MAX_STORAGE + "], " + Thread.currentThread().getName() + "正在生产:[" + (i + 1) + "].");
    }
    System.out.println(Thread.currentThread().getName() + "生产完成, 共生产:" + num + ", 库存:[" + list.size() + "]/[" + MAX_STORAGE + "].");
    }catch (InterruptedException e){
    e.printStackTrace();
    }finally {
    produce.signalAll();
    consume.signalAll();
    lock.unlock();
    }
    }

    @Override
    public void consume(int num) {
    lock.lock();
    try {
    while (list.size() < num) {
    System.out.println("库存:[" + list.size() + "]/[" + MAX_STORAGE + "], 待消费数量:[" + num + "], 消费阻塞:" + Thread.currentThread().getName() + "阻塞.");
    consume.await();
    }
    for (int i = 0; i < num; i++) {
    System.out.println("库存:[" + list.size() + "]/[" + MAX_STORAGE + "], " + Thread.currentThread().getName() + "正在消费:[" + (i + 1) + "].");
    list.remove();
    }
    System.out.println(Thread.currentThread().getName() + "消费完成, 共消费:" + num + ", 库存:[" + list.size() + "]/[" + MAX_STORAGE + "].");
    }catch (InterruptedException e){
    e.printStackTrace();
    }finally {
    consume.signalAll();
    produce.signalAll();
    lock.unlock();
    }
    }
    }

    class StorageBlockingQueue implements Storage{
    int MAX_STORAGE = 10;
    BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(MAX_STORAGE);

    @Override
    public void produce(int num) {
    if(queue.size() == MAX_STORAGE){
    System.out.println("库存:[" + queue.size() + "]/[" + MAX_STORAGE + "], 待生产数量:[" + num + "], 生产阻塞:" + Thread.currentThread().getName() + "阻塞.");
    }
    for(int i = 0; i < num; i++){
    try {
    queue.put(new Object());
    System.out.println("库存:[" + queue.size() + "]/[" + MAX_STORAGE + "], " + Thread.currentThread().getName() + "正在生产:[" + (i + 1) + "].");
    }catch (InterruptedException e){
    e.printStackTrace();
    }
    }
    System.out.println(Thread.currentThread().getName() + "生产完成, 共生产:" + num + ", 库存:[" + queue.size() + "]/[" + MAX_STORAGE + "].");
    }

    @Override
    public void consume(int num) {
    if(queue.size() == 0){
    System.out.println("库存:[" + queue.size() + "]/[" + MAX_STORAGE + "], 待消费数量:[" + num + "], 消费阻塞:" + Thread.currentThread().getName() + "阻塞.");
    }
    for (int i = 1; i < num; i++){
    try{
    queue.take();
    System.out.println("库存:[" + queue.size() + "]/[" + MAX_STORAGE + "], " + Thread.currentThread().getName() + "正在消费:[" + (i + 1) + "].");
    }catch (InterruptedException e){
    e.printStackTrace();
    }
    }
    System.out.println(Thread.currentThread().getName() + "消费完成, 共消费:" + num + ", 库存:[" + queue.size() + "]/[" + MAX_STORAGE + "].");
    }
    }

    class Producer implements Runnable {

    Storage storage;
    int num;

    public Producer(Storage storage){
    this.storage = storage;
    }

    @Override
    public void run() {
    produce(num);
    }

    public void produce(int num){
    storage.produce(num);
    }

    public void setNum(int num) {
    this.num = num;
    }
    }

    class Consumer implements Runnable {

    Storage storage;
    int num;

    public Consumer(Storage storage){
    this.storage = storage;
    }
    @Override
    public void run() {
    consumer(num);
    }

    public void consumer(int num){
    storage.consume(num);
    }

    public void setNum(int num) {
    this.num = num;
    }
    }


     输出结果:

    StorageObject()输出结果:
    库存:[1]/[10], producer1正在生产:[1].
    库存:[2]/[10], producer1正在生产:[2].
    库存:[3]/[10], producer1正在生产:[3].
    库存:[4]/[10], producer1正在生产:[4].
    库存:[5]/[10], producer1正在生产:[5].
    库存:[6]/[10], producer1正在生产:[6].
    producer1生产完成, 共生产:6, 库存:[6]/[10].
    库存:[6]/[10], consumer2正在消费:[1].
    库存:[5]/[10], consumer2正在消费:[2].
    库存:[4]/[10], consumer2正在消费:[3].
    库存:[3]/[10], consumer2正在消费:[4].
    库存:[2]/[10], consumer2正在消费:[5].
    consumer2消费完成, 共消费:5, 库存:[1]/[10].
    库存:[1]/[10], consumer1正在消费:[1].
    consumer1消费完成, 共消费:1, 库存:[0]/[10].
    库存:[1]/[10], producer4正在生产:[1].
    库存:[2]/[10], producer4正在生产:[2].
    库存:[3]/[10], producer4正在生产:[3].
    库存:[4]/[10], producer4正在生产:[4].
    库存:[5]/[10], producer4正在生产:[5].
    库存:[6]/[10], producer4正在生产:[6].
    库存:[7]/[10], producer4正在生产:[7].
    库存:[8]/[10], producer4正在生产:[8].
    producer4生产完成, 共生产:8, 库存:[8]/[10].
    库存:[9]/[10], producer3正在生产:[1].
    producer3生产完成, 共生产:1, 库存:[9]/[10].
    库存:[10]/[10], producer2正在生产:[1].
    producer2生产完成, 共生产:1, 库存:[10]/[10].
    
    StorageLock()输出结果:
    库存:[1]/[10], producer1正在生产:[1].
    库存:[2]/[10], producer1正在生产:[2].
    库存:[3]/[10], producer1正在生产:[3].
    库存:[4]/[10], producer1正在生产:[4].
    库存:[5]/[10], producer1正在生产:[5].
    库存:[6]/[10], producer1正在生产:[6].
    producer1生产完成, 共生产:6, 库存:[6]/[10].
    库存:[7]/[10], producer2正在生产:[1].
    producer2生产完成, 共生产:1, 库存:[7]/[10].
    库存:[8]/[10], producer3正在生产:[1].
    producer3生产完成, 共生产:1, 库存:[8]/[10].
    库存:[8]/[10], 待生产数量:[8], 生产阻塞:producer4阻塞.
    库存:[8]/[10], consumer1正在消费:[1].
    consumer1消费完成, 共消费:1, 库存:[7]/[10].
    库存:[7]/[10], consumer2正在消费:[1].
    库存:[6]/[10], consumer2正在消费:[2].
    库存:[5]/[10], consumer2正在消费:[3].
    库存:[4]/[10], consumer2正在消费:[4].
    库存:[3]/[10], consumer2正在消费:[5].
    consumer2消费完成, 共消费:5, 库存:[2]/[10].
    库存:[3]/[10], producer4正在生产:[1].
    库存:[4]/[10], producer4正在生产:[2].
    库存:[5]/[10], producer4正在生产:[3].
    库存:[6]/[10], producer4正在生产:[4].
    库存:[7]/[10], producer4正在生产:[5].
    库存:[8]/[10], producer4正在生产:[6].
    库存:[9]/[10], producer4正在生产:[7].
    库存:[10]/[10], producer4正在生产:[8].
    producer4生产完成, 共生产:8, 库存:[10]/[10].
    
    StorageBlockingQueue()输出结果:
    库存:[3]/[10], producer2正在生产:[1].
    库存:[3]/[10], consumer2正在消费:[2].
    库存:[4]/[10], producer4正在生产:[1].
    consumer1消费完成, 共消费:1, 库存:[4]/[10].
    库存:[3]/[10], producer3正在生产:[1].
    producer3生产完成, 共生产:1, 库存:[3]/[10].
    库存:[2]/[10], producer1正在生产:[1].
    库存:[3]/[10], producer4正在生产:[2].
    库存:[2]/[10], consumer2正在消费:[3].
    producer2生产完成, 共生产:1, 库存:[3]/[10].
    库存:[4]/[10], consumer2正在消费:[4].
    库存:[5]/[10], producer4正在生产:[3].
    库存:[4]/[10], producer1正在生产:[2].
    库存:[5]/[10], producer1正在生产:[3].
    库存:[4]/[10], producer4正在生产:[4].
    库存:[3]/[10], consumer2正在消费:[5].
    库存:[7]/[10], producer4正在生产:[5].
    库存:[6]/[10], producer1正在生产:[4].
    库存:[8]/[10], producer4正在生产:[6].
    consumer2消费完成, 共消费:5, 库存:[7]/[10].
    库存:[10]/[10], producer4正在生产:[7].
    库存:[9]/[10], producer1正在生产:[5].
    

      

    从输出结果看,Synchronized和Lock方式,生产和消费是有序的,也就是某一生产线程全生产完,才会释放锁,不会被其他线程强占。BlockingQueue方式,生产和消费是无序的,即,生产线程没有生产完,或是消费线程没有消费完,都会被其他线程强占。原因是,前者的循环生产操作是在锁同步块里边,后者的生产循环生产操作是在锁的外面。

  • 相关阅读:
    【web性能优化】DNS解析与ip
    【web性能优化】雅虎军规
    【web性能优化】优化提纲及图片优化(慕课网笔记)
    【web性能优化】常用缓存方式(慕课网学习笔记)
    【前端】企业微信客户端调试
    【es6】es6使用集锦
    【前端】遇到的各种报错
    【前端】安装wampserver提示丢失MSVCR100.dll的解决方法
    【es6】将2个数组合并为一个数组
    【web】使用ionic搭建移动端项目 icon-radio 标签在ios下全部选中的问题
  • 原文地址:https://www.cnblogs.com/kisf/p/6971097.html
Copyright © 2011-2022 走看看