zoukankan      html  css  js  c++  java
  • 实现生产者消费者模式的方法

    1、使用 BlockingQueue

    ArrayBlockingQueue 完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

    import java.util.concurrent.*;
    
    public class MyProdCons {
        public static void main(String[] args){
            BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
            
            Runnable producer = () ->{
                int num1 = 0;
                while(true) {
                    try {
                        queue.put(new Object());
                        System.out.println("生产个数:  " + num1++);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
            };
            new Thread(producer).start();
            new Thread(producer).start();
            
            Runnable consumer = () ->{
                int num2 = 0;
                while(true) {
                    try {
                        queue.take();
                        System.out.println("消费个数:  " + num2++);
                    }catch(InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            new Thread(consumer).start();
            new Thread(consumer).start();
        }
    }
    View Code

    2、使用 Condition

    import java.util.*;
    import java.util.concurrent.*;
    import java.util.concurrent.locks.*;
    
    public class MyBlockingQueueForCondition {
        private Queue<Object> queue;
        private int max = 16;
        private ReentrantLock lock = new ReentrantLock();
        private Condition notEmpty = lock.newCondition();
        private Condition notFull = lock.newCondition();
        
        public MyBlockingQueueForCondition(int size) {
            this.max = size;
            queue = new LinkedList();
        }
        public void put(Object o) throws InterruptedException{
            lock.lock();
            try {
                while(queue.size() == max) {
                    notFull.await();
                }
                queue.add(o);
                notEmpty.signalAll();
            }finally{
                lock.unlock();
            }
        }
        
        public Object take() throws InterruptedException{
            lock.lock();
            try {
                while(queue.size() == 0) {
                    notEmpty.await();
                }
                Object item = queue.remove();
                notFull.signalAll();
                return item;
            }finally {
                lock.unlock();
            }
        }
    }
    View Code

    3、使用 wait/notify

    public class WaitStyle {
    
        public static void main(String[] args) {
            MyBlockingQueue myBlockingQueue = new MyBlockingQueue(5);
            Producer producer = new Producer(myBlockingQueue);
            Consumer consumer = new Consumer(myBlockingQueue);
            new Thread(producer).start();
            new Thread(consumer).start();
        }
    }
    
    class Producer implements Runnable{
        private MyBlockingQueue storage;
        public Producer(MyBlockingQueue storage) {
            this.storage = storage;            
        }
        public void run() {
            for(int i=0;i<10;i++) {
                try {
                    storage.put();
                }catch(InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    class Consumer implements Runnable{
        private MyBlockingQueue storage;
        public Consumer(MyBlockingQueue storage) {
            this.storage = storage;
        }
        public void run() {
            for(int i=0;i<10;i++) {
                try {
                    storage.take();
                }catch(InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    
    import java.util.*;
    
    public class MyBlockingQueue {
        private int maxSize;
        private LinkedList<Object> storage;
        
        public MyBlockingQueue(int size) {
            this.maxSize = size;
            storage = new LinkedList<>();
        }
        
        public synchronized void put() throws InterruptedException{
            while(storage.size() == maxSize) {
                wait();
            }
            storage.add(new Object());
            System.out.println("放入后个数 " + storage.size());
            notifyAll();
        }
        
        public synchronized void take() throws InterruptedException{
            while(storage.size() == 0) {
                wait();
            }
            //System.out.println(storage.remove());
            storage.remove();
            System.out.println("取出后个数 " + storage.size());
            notifyAll();
        }
    }
    View Code

    以上是三种实现生产者消费者模式的方法,其中,第一种 BlockingQueue 模式实现比较简单,但其背后的实现原理在第二种、第三种实现方法中得以体现,第二种、第三种实现方法本质上是我们自己实现了 BlockingQueue 的一些核心逻辑,供生产者与消费者使用。

    ref:

    拉勾课堂 徐隆曦  《Java 并发编程 78 讲》

  • 相关阅读:
    centos下修改hosts
    metasploit rpc
    使用Suricata和ELK进行网络入侵检测
    查询存储设备的UUID
    CentOS基础命令大全
    两个有序数组合并到一个新数组
    dubbo
    redis基本数据类型【3】-List类型
    redis基本数据类型【2】-Hash类型
    redis基本数据类型【1】-String类型
  • 原文地址:https://www.cnblogs.com/zgq25302111/p/13198401.html
Copyright © 2011-2022 走看看