zoukankan      html  css  js  c++  java
  • 生产者/消费者模式实现

      wait/notify最经典的案例就是"生产者/消费者"模式。但是此模式有一些需要注意的地方。

      生产者-消费者也有多种实现方式。

        (1)常见的就是synchronized结合wait+notify实现

        (2)用Lock类实现

        (3)使用BlockingQueue阻塞队列实现

     一、 synchronized结合wait()、notify()实现生产者消费者模型

    1. 一个简单的生产者消费者(相当于一个生产者,两个消费者)

      一个线程向集合中添加元素,两个线程从集合中删除元素,与之前等待/通知博客的最后一个案例类似。

    package cn.qlq.thread.seven;
    
    import java.util.ArrayList;
    import java.util.List;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 利用之前的等待/通知实现一个简单的生产者消费者
     * 
     * @author Administrator
     *
     */
    public class Demo1 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo1.class);
    
        public static void main(String[] args) throws InterruptedException {
            final List<String> list = new ArrayList<String>();
    
            // 删除元素线程1
            Thread sub1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        synchronized (list) {
                            while (true) {
                                while (list.size() == 0) {
                                    list.wait();
                                }
    
                                LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                        Thread.currentThread().getName());
                                list.remove(0);
                                list.notifyAll();
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "sub1");
            sub1.start();
    
            // 删除元素线程2
            Thread sub2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        synchronized (list) {
                            while (true) {
                                while (list.size() == 0) {
                                    list.wait();
                                }
    
                                LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                        Thread.currentThread().getName());
                                list.remove(0);
                                list.notifyAll();
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "sub2");
            sub2.start();
    
            // 增加元素线程
            Thread.sleep(1 * 1000);
            Thread addThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 5; i++) {
                            synchronized (list) {
                                list.add(i + "");
                                LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                                list.notifyAll();
                                list.wait();
                            }
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("InterruptedException error", e);
                    }
                }
            }, "B");
            addThread.start();
        }
    }

    结果:

    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->0,threadName->B
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->0, threadName->sub2
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->1,threadName->B
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->1, threadName->sub1
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->2,threadName->B
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->2, threadName->sub2
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->3,threadName->B
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->3, threadName->sub1
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->4,threadName->B
    18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->4, threadName->sub2

    2. 多生产与多消费:操作值-假死 

      假死的现象其实就是进入waiting状态。如果全部线程都进入waiting状态,则程序就不再执行任何业务功能了,整个项目呈停止状态。

      例如两个生产者两个消费者最后处于假死状态的例子:

    package cn.qlq.thread.seven;
    
    import java.util.ArrayList;
    import java.util.List;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;/**
     * 多生产与多消费:操作值-假死( 多生产与多消费保证只有一个元素生产与消费)
     * 
     * @author Administrator
     *
     */
    public class Demo2 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);
    
        public static void main(String[] args) throws InterruptedException {
            final List<String> list = new ArrayList<String>();
    
            // 删除元素线程1
            Thread sub1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        synchronized (list) {
                            while (true) {
                                while (list.size() == 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
    
                                LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                        Thread.currentThread().getName());
                                list.remove(0);
                                list.notify();
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "sub1");
            sub1.start();
    
            // 删除元素线程2
            Thread sub2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        synchronized (list) {
                            while (true) {
                                while (list.size() == 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
    
                                LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                        Thread.currentThread().getName());
                                list.remove(0);
                                list.notify();
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "sub2");
            sub2.start();
    
            // 增加元素线程
            Thread.sleep(1 * 1000);
            Thread addThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 50000; i++) {
                            synchronized (list) {
                                while (list.size() != 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
                                list.add(i + "");
                                LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                                list.notify();
                            }
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("InterruptedException error", e);
                    }
                }
            }, "add1");
            addThread.start();
    
            // 增加元素线程
            Thread.sleep(1 * 1000);
            Thread addThread2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 50000; i++) {
                            synchronized (list) {
                                while (list.size() != 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
                                list.add(i + "");
                                LOGGER.info("添加元素->{},threadName->{}", Thread.currentThread().getName());
                                list.notify();
                            }
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("InterruptedException error", e);
                    }
                }
            }, "add2");
            addThread2.start();
    
            Thread.sleep(10 * 1000);
            LOGGER.info("sub1 state->{}", sub1.getState());
            LOGGER.info("sub2 state->{}", sub2.getState());
            LOGGER.info("add1 state->{}", addThread.getState());
            LOGGER.info("add2 state->{}", addThread2.getState());
        }
    }

    结果:

    18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub1
    18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub2
    18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 添加元素->0,threadName->add1
    18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->add1
    18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub1
    18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] list.remove ->0, threadName->sub1
    18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub1
    18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub2
    18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub2
    18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 添加元素->add2,threadName->{}
    18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->add1
    18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->add1
    18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->add2
    18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub1 state->WAITING
    18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub2 state->WAITING
    18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add1 state->WAITING
    18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add2 state->WAITING

      

    解释一下上面的线程假死的原因:

      由于唤醒线程调用的是notify()唤醒单个线程,所以有可能唤醒的是同类的线程,也就是生产者唤醒的是生产者,消费者唤醒的是消费者。导致最后四个线程都处于waiting状态。

    解决办法:

      唤醒的时候采用notifyAll()唤醒所有的线程唤醒所有的线程,避免只唤醒同类线程。

    3. 多生产与多消费:操作值

      为了避免上面的假死线下,唤醒的时候采用notifyAll()唤醒所有的线程唤醒所有的线程,避免只唤醒同类线程。

      唤醒的时候也唤醒异类,这样就不会出现假死的状态了,程序会一直运行下去。

    package cn.qlq.thread.seven;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 多生产与多消费:操作值
     * 
     * @author Administrator
     *
     */
    public class Demo3 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);
    
        public static void main(String[] args) throws InterruptedException {
            final List<String> list = new ArrayList<String>();
    
            // 删除元素线程1
            Thread sub1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        synchronized (list) {
                            while (true) {
                                while (list.size() == 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
    
                                LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                        Thread.currentThread().getName());
                                list.remove(0);
                                list.notifyAll();
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "sub1");
            sub1.start();
    
            // 删除元素线程2
            Thread sub2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        synchronized (list) {
                            while (true) {
                                while (list.size() == 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
    
                                LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                        Thread.currentThread().getName());
                                list.remove(0);
                                list.notifyAll();
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "sub2");
            sub2.start();
    
            // 增加元素线程
            Thread.sleep(1 * 1000);
            Thread addThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 50000; i++) {
                            synchronized (list) {
                                while (list.size() != 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
                                list.add(i + "");
                                LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                                list.notifyAll();
                            }
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("InterruptedException error", e);
                    }
                }
            }, "add1");
            addThread.start();
    
            // 增加元素线程
            Thread.sleep(1 * 1000);
            Thread addThread2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 50000; i++) {
                            synchronized (list) {
                                while (list.size() != 0) {
                                    LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                    list.wait();
                                    LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                                }
                                list.add(i + "");
                                LOGGER.info("添加元素->{},threadName->{}", Thread.currentThread().getName());
                                list.notifyAll();
                            }
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("InterruptedException error", e);
                    }
                }
            }, "add2");
            addThread2.start();
    
            Thread.sleep(10 * 1000);
            LOGGER.info("sub1 state->{}", sub1.getState());
            LOGGER.info("sub2 state->{}", sub2.getState());
            LOGGER.info("add1 state->{}", addThread.getState());
            LOGGER.info("add2 state->{}", addThread2.getState());
        }
    }

    4.  多个生产者一个消费者

      附上一个多生产与一个消费者的例子。大体的思路是对资源加锁,在线程中通过while(true)不停的调用资源的add()方法和remove()方法,两个方法都是同步方法,需要同步是对当前对象加锁。每个对象都有一个阻塞队列,一个就绪队列。

      在学习完wait/notify之后再次重写下面代码就清晰多了。

    package cn.qlq.thread.seven;
    
    public class Demo4 {
        public static void main(String[] args) {
            MyResource myResource = new MyResource();
            // 多个生产者一个消费者
            MyConsumerThread myConsumerThread = new MyConsumerThread(myResource);
            // MyConsumerThread myConsumerThread1 = new
            // MyConsumerThread(myResource);
            // MyConsumerThread myConsumerThread2 = new
            // MyConsumerThread(myResource);
            MyProducerThread myProducerThread = new MyProducerThread(myResource);
            MyProducerThread myProducerThread1 = new MyProducerThread(myResource);
            MyProducerThread myProducerThread2 = new MyProducerThread(myResource);
            myProducerThread.start();
            myProducerThread1.start();
            myProducerThread2.start();
            myConsumerThread.start();
            // myConsumerThread1.start();
            // myConsumerThread2.start();
        }
    }
    
    /**
     * 资源类 一个加一个减(都有同步锁)
     * 
     * @author: qlq
     * @date : 2018年6月15日上午11:38:37
     */
    class MyResource {
        private int num;// 资源数量
        private int capacity = 10;// 资源容量
    
        /**
         * 同步方法增加资源 如果数量大于容量,线程进入阻塞状态 否则通知消费者进行消费
         */
        public synchronized void add() {
            if (num >= capacity) {// 大于等于的话进入阻塞状态
                try {
                    wait();
                    System.out.println(Thread.currentThread().getName() + "进入线程等待。。。");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                num++;// 生产一件资源
                System.out.println(Thread.currentThread().getName() + "生产一件资源,目前剩余资源" + num + "件");
                notifyAll();// 通知消费者进行消费
            }
        }
    
        /**
         * 同步方法移除资源 如果num>0,消费资源,通知生产者进行生产 否则的话进入阻塞队列
         */
        public synchronized void remove() {
            if (num > 0) {
                num--;
                System.out.println(Thread.currentThread().getName() + "消费一件资源,目前剩余" + num + "件");
                notifyAll();// 唤醒生产者进行生产
            } else {
                try {
                    wait();
                    System.out.println(Thread.currentThread().getName() + "进入线程等待。。。");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 生产者类
     * 
     * @author: qlq
     * @date : 2018年6月16日上午11:08:05
     */
    class MyProducerThread extends Thread {
        private MyResource resource;
    
        protected MyProducerThread(MyResource resource) {
            super();
            this.resource = resource;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(2 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.add();
            }
        }
    }
    
    /**
     * 消费者线程
     * 
     * @author: qlq
     * @date : 2018年6月16日上午11:09:06
     */
    class MyConsumerThread extends Thread {
        private MyResource resource;
    
        protected MyConsumerThread(MyResource resource) {
            super();
            this.resource = resource;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(2 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.remove();
            }
        }
    }

      

    总结:   

      每个对象都有一个阻塞队列,一个就绪队列。同步方法中才可以使用wait()/notify(),而且获得哪个锁才能调用对应锁的对应方法,否则会报非法监视器异常。

     

    二、 ReentrantLock结合Condition的await()、sianalAll()实现生产者消费者模式

    1.单个生产线程,多个消费线程的例子

      生产者和消费者分别阻塞在两个条件下面的例子

    package cn.qlq.thread.eleven;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * ReentrantLock結合Condition实现生产者-消费者模型(单生产-多消费)
     * 
     * @author Administrator
     *
     */
    public class Demo4 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class);
        private Lock lock = new ReentrantLock();
        private Condition producerCon = lock.newCondition();
        private Condition consumerCon = lock.newCondition();
        private volatile List<String> list = new ArrayList<String>();// 模拟是一个容器
        private volatile int capacity = 3;// 最多3个
    
        public void removeEle() {
            try {
                lock.lock();
                for (int i = 0; i < 5; i++) {
                    while (list.size() == 0) {
                        consumerCon.await();// 阻塞消费者
                    }
                    LOGGER.info("threadName - > {} 消费元素 {}", Thread.currentThread().getName(), list.get(0));
                    list.remove(0);
                    producerCon.signal();// 唤醒生产者,由于是单个生产者,所以可以用signal()
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void addEle() {
            try {
                lock.lock();
                for (int i = 0; i < 10; i++) {
                    // 超过容量阻塞生产者
                    while (list.size() >= capacity) {
                        producerCon.await();
                    }
                    LOGGER.info("threadName - > {} 生产元素 {}", Thread.currentThread().getName(), i);
                    list.add(i + "");
                    consumerCon.signalAll();// 唤醒消费者,由于是多个消费者,所以可以用signalAll()
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            final Demo4 demo4 = new Demo4();
    
            // 两个消费者
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo4.removeEle();
                }
            }, "con1").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo4.removeEle();
                }
            }, "con2").start();
    
            // 单个生产者
            Thread.sleep(1 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo4.addEle();
                }
            }, "pro1").start();
        }
    }

    结果:

    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 0
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 1
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 2
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 0
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 1
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 2
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 3
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 4
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 5
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 3
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 4
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 5
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 6
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 7
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 8
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 6
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 7
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 8
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 9
    13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 9

    2.多个生产线程,多个消费线程的例子

    package cn.qlq.thread.eleven;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * ReentrantLock結合Condition实现生产者-消费者模型(单生产-多消费)
     * 
     * @author Administrator
     *
     */
    public class Demo5 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class);
        private Lock lock = new ReentrantLock();
        private Condition producerCon = lock.newCondition();
        private Condition consumerCon = lock.newCondition();
        private volatile List<String> list = new ArrayList<String>();// 模拟是一个容器
        private volatile int capacity = 3;// 最多十个
    
        public void removeEle() {
            try {
                lock.lock();
                for (int i = 0; i < 5; i++) {
                    while (list.size() == 0) {
                        consumerCon.await();// 阻塞消费者
                    }
                    LOGGER.info("threadName - > {} 消费元素 {}", Thread.currentThread().getName(), list.get(0));
                    list.remove(0);
                    producerCon.signalAll();// 唤醒生产者,由于是单个生产者,所以可以用signal()
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void addEle() {
            try {
                lock.lock();
                for (int i = 0; i < 5; i++) {
                    // 超过容量阻塞生产者
                    while (list.size() >= capacity) {
                        producerCon.await();
                    }
                    LOGGER.info("threadName - > {} 生产元素 {}", Thread.currentThread().getName(), i);
                    list.add(i + "");
                    consumerCon.signalAll();// 唤醒消费者,由于是多个消费者,所以可以用signalAll()
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            final Demo5 demo4 = new Demo5();
    
            // 两个消费者
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo4.removeEle();
                }
            }, "con1").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo4.removeEle();
                }
            }, "con2").start();
    
            // 单个生产者
            Thread.sleep(1 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo4.addEle();
                }
            }, "pro1").start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo4.addEle();
                }
            }, "pro2").start();
        }
    }

    结果:

    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 0
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 1
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 2
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 0
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 1
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 2
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 3
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 4
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 0
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 3
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 4
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 0
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 1
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 2
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 3
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 1
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 2
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 3
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 4
    13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 4

    三、 使用BlockingQueue阻塞队列实现生产者消费者

      参考我的另一篇博客:https://www.cnblogs.com/qlqwjy/p/10175201.html 

  • 相关阅读:
    CachedRowSet使用
    mybatis There is no getter for property named 'xx' in 'class java.lang.String
    基于tcpdump的Android智能移动终端数据包捕获完整解决方案
    analytics详解
    android开发图片分辨率
    缩放图片,解决bitmap 内存溢出out of memory的问题
    使用windowAnimations定义Activity及Dialog的进入退出效果
    读取本地已有的.db数据库
    MyBatis 问题列表
    cxf 相关问题
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/10115756.html
Copyright © 2011-2022 走看看