zoukankan      html  css  js  c++  java
  • 生产者消费者模式

    生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。

    阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

    wait/notify方法

    首先,我们搞清楚Thread.sleep()方法和Object.wait()、Object.notify()方法的区别。

    1. sleep()是Thread类的方法;而wait()notify()notifyAll()是Object类中定义的方法;尽管这两个方法都会影响线程的执行行为,但是本质上是有区别的。

    2. Thread.sleep()不会导致锁行为的改变,如果当前线程是拥有锁的,那么Thread.sleep()不会让线程释放锁。如果能够帮助你记忆的话,可以简单认为和锁相关的方法都定义在Object类中,因此调用Thread.sleep()是不会影响锁的相关行为。

    3. Thread.sleepObject.wait都会暂停当前的线程,对于CPU资源来说,不管是哪种方式暂停的线程,都表示它暂时不再需要CPU的执行时间。OS会将执行时间分配给其它线程。区别是调用wait后,需要别的线程执行notify/notifyAll才能够重新获得CPU执行时间。

    线程状态图:

    • Thread.sleep()让线程从 【running】 -> 【阻塞态】 时间结束/interrupt -> 【runnable】
    • Object.wait()让线程从 【running】 -> 【等待队列】notify -> 【锁池】 -> 【runnable】

    实现生产者消费者模型

    生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。在Java中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。

    (1)Object的wait() / notify()方法
    (2)Lock和Condition的await() / signal()方法
    (3)BlockingQueue阻塞队列方法
    (4)PipedInputStream / PipedOutputStream

    本文只介绍最常用的前三种,第四种暂不做讨论。

    1. 使用Object的wait() / notify()方法

    wait()nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。

      • wait():当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
      • notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
    import java.util.Queue;
    import java.util.Random;
    
    /**
     * 生产者
     */
    public class Producer {
        private Queue<Integer> queue;
        int maxSize;
        int i = 0;
    
        public Producer( Queue<Integer> queue, int maxSize) {
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        public void callProduce() throws InterruptedException {
            synchronized (queue) {
                while (queue.size() == maxSize) {
                    System.out.println("Queue is full, [" + Thread.currentThread().getName() + "] thread waiting.");
                    queue.wait();
                }
                System.out.println("[" + Thread.currentThread().getName() + "] Producing value : " + i);
                queue.offer( i++);
                queue.notifyAll();
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    Producer
    import java.util.Queue;
    import java.util.Random;
    
    /**
     * 消费者
     */
    public class Consumer {
        private Queue<Integer> queue;
        int maxSize;
    
        public Consumer( Queue<Integer> queue, int maxSize) {
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        public void callConsumer() throws InterruptedException {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    System.out.println("Queue is empty, [" + Thread.currentThread().getName() + "] thread is waiting.");
                    queue.wait();
                }
                int x = queue.poll();
                System.out.println("[" + Thread.currentThread().getName() + "] Consuming value : " + x);
                queue.notifyAll();
    
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    Consumer
    import lombok.SneakyThrows;
    
    import java.util.LinkedList;
    import java.util.Queue;
    
    /**
     * 生产者消费者模式:使用Object.wait() / notify()方法实现
     */
    public class Test {
        private static final int CAPACITY = 5;
    
        public static void main(String args[]) {
            Queue<Integer> queue = new LinkedList<Integer>();
    
            Producer producer1 = new Producer( queue, CAPACITY);
            Consumer consumer1 = new Consumer( queue, CAPACITY);
    
            Thread p1=new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true){
                        producer1.callProduce();
                    }
                }
            });
            Thread p2=new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true){
                        producer1.callProduce();
                    }
                }
            });
            Thread c1=new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true){
                        consumer1.callConsumer();
                    }
                }
            });
            p1.setName("P1");
            p2.setName("P2");
            c1.setName("C1");
            p1.start();
            p2.start();
            c1.start();
        }
    }
    测试类
    D:jdk1.8.0_241injava "-javaagent:D:IDEAIntelliJ IDEA Community Edition 2017.3.5libidea_rt.jar=61728:D:IDEAIntelliJ IDEA Community Edition 2017.3.5in" -Dfile.encoding=UTF-8 -classpath D:jdk1.8.0_241jrelibcharsets.jar;D:jdk1.8.0_241jrelibdeploy.jar;D:jdk1.8.0_241jrelibextaccess-bridge-64.jar;D:jdk1.8.0_241jrelibextcldrdata.jar;D:jdk1.8.0_241jrelibextdnsns.jar;D:jdk1.8.0_241jrelibextjaccess.jar;D:jdk1.8.0_241jrelibextjfxrt.jar;D:jdk1.8.0_241jrelibextlocaledata.jar;D:jdk1.8.0_241jrelibext
    ashorn.jar;D:jdk1.8.0_241jrelibextsunec.jar;D:jdk1.8.0_241jrelibextsunjce_provider.jar;D:jdk1.8.0_241jrelibextsunmscapi.jar;D:jdk1.8.0_241jrelibextsunpkcs11.jar;D:jdk1.8.0_241jrelibextzipfs.jar;D:jdk1.8.0_241jrelibjavaws.jar;D:jdk1.8.0_241jrelibjce.jar;D:jdk1.8.0_241jrelibjfr.jar;D:jdk1.8.0_241jrelibjfxswt.jar;D:jdk1.8.0_241jrelibjsse.jar;D:jdk1.8.0_241jrelibmanagement-agent.jar;D:jdk1.8.0_241jrelibplugin.jar;D:jdk1.8.0_241jrelib
    esources.jar;D:jdk1.8.0_241jrelib
    t.jar;E:study	argetclasses;F:maven
    epocomsquareupokhttp3okhttp3.11.0okhttp-3.11.0.jar;F:maven
    epocomsquareupokiookio1.14.0okio-1.14.0.jar;F:maven
    epoorgspringframeworkspring-beans4.3.12.RELEASEspring-beans-4.3.12.RELEASE.jar;F:maven
    epoorgspringframeworkspring-context4.3.12.RELEASEspring-context-4.3.12.RELEASE.jar;F:maven
    epoorgspringframeworkspring-expression4.3.12.RELEASEspring-expression-4.3.12.RELEASE.jar;F:maven
    epoorgspringframeworkspring-core4.3.12.RELEASEspring-core-4.3.12.RELEASE.jar;F:maven
    epocommons-loggingcommons-logging1.2commons-logging-1.2.jar;F:maven
    epoorgspringframeworkspring-aop4.3.12.RELEASEspring-aop-4.3.12.RELEASE.jar;F:maven
    epoorgaspectjaspectjweaver1.8.13aspectjweaver-1.8.13.jar;F:maven
    epoorgaspectjaspectjrt1.8.13aspectjrt-1.8.13.jar;F:maven
    epoorgspringframeworkspring-jdbc4.3.12.RELEASEspring-jdbc-4.3.12.RELEASE.jar;F:maven
    epoorgspringframeworkspring-tx4.3.12.RELEASEspring-tx-4.3.12.RELEASE.jar;F:maven
    epoorgprojectlomboklombok1.16.20lombok-1.16.20.jar;F:maven
    epomysqlmysql-connector-java5.1.38mysql-connector-java-5.1.38.jar;F:maven
    epoorgmybatismybatis3.2.8mybatis-3.2.8.jar;F:maven
    epoorgmybatismybatis-spring1.3.2mybatis-spring-1.3.2.jar;F:maven
    epocommchangec3p0.9.5.2c3p0-0.9.5.2.jar;F:maven
    epocommchangemchange-commons-java.2.11mchange-commons-java-0.2.11.jar;F:maven
    epocomalibabafastjson1.2.58fastjson-1.2.58.jar;F:maven
    epoorgslf4jslf4j-log4j121.7.5slf4j-log4j12-1.7.5.jar;F:maven
    epoorgslf4jslf4j-api1.7.5slf4j-api-1.7.5.jar;F:maven
    epolog4jlog4j1.2.17log4j-1.2.17.jar;F:maven
    epo
    edisclientsjedis2.9.0jedis-2.9.0.jar;F:maven
    epoorgapachecommonscommons-pool22.4.2commons-pool2-2.4.2.jar com.design_pattern.produce_consumer.Test
    [P2] Producing value : 0
    [P1] Producing value : 1
    [P1] Producing value : 2
    [P1] Producing value : 3
    [P1] Producing value : 4
    Queue is full, [P1] thread waiting.
    [C1] Consuming value : 0
    [C1] Consuming value : 1
    [C1] Consuming value : 2
    [C1] Consuming value : 3
    [C1] Consuming value : 4
    [P1] Producing value : 5
    [P2] Producing value : 6
    [P2] Producing value : 7
    [P2] Producing value : 8
    [P2] Producing value : 9
    Queue is full, [P2] thread waiting.
    Queue is full, [P1] thread waiting.
    [C1] Consuming value : 5
    [P1] Producing value : 10
    Queue is full, [P2] thread waiting.
    Queue is full, [P1] thread waiting.
    [C1] Consuming value : 6
    [C1] Consuming value : 7
    [C1] Consuming value : 8
    [C1] Consuming value : 9
    [C1] Consuming value : 10
    Queue is empty, [C1] thread is waiting.
    [P1] Producing value : 11
    [P1] Producing value : 12
    [P1] Producing value : 13
    [P1] Producing value : 14
    [P1] Producing value : 15
    Queue is full, [P1] thread waiting.
    Queue is full, [P2] thread waiting.
    [C1] Consuming value : 11
    [C1] Consuming value : 12
    [C1] Consuming value : 13
    [P2] Producing value : 16
    [P2] Producing value : 17
    [P2] Producing value : 18
    Queue is full, [P2] thread waiting.
    Queue is full, [P1] thread waiting.
    [C1] Consuming value : 14
    [C1] Consuming value : 15
    [P1] Producing value : 19
    [P1] Producing value : 20
    Queue is full, [P1] thread waiting.
    Queue is full, [P2] thread waiting.
    [C1] Consuming value : 16
    [C1] Consuming value : 17
    [C1] Consuming value : 18
    [C1] Consuming value : 19
    [C1] Consuming value : 20
    Queue is empty, [C1] thread is waiting.
    [P2] Producing value : 21
    [P1] Producing value : 22
    [P1] Producing value : 23
    [P2] Producing value : 24
    [P2] Producing value : 25
    Queue is full, [P2] thread waiting.
    [C1] Consuming value : 21
    [P2] Producing value : 26
    Queue is full, [P2] thread waiting.
    Queue is full, [P1] thread waiting.
    [C1] Consuming value : 22
    [C1] Consuming value : 23
    [C1] Consuming value : 24
    [C1] Consuming value : 25
    [P1] Producing value : 27
    [P1] Producing value : 28
    [P1] Producing value : 29
    [P1] Producing value : 30
    Queue is full, [P1] thread waiting.
    Queue is full, [P2] thread waiting.
    [C1] Consuming value : 26
    [C1] Consuming value : 27
    [C1] Consuming value : 28
    [C1] Consuming value : 29
    [C1] Consuming value : 30
    Queue is empty, [C1] thread is waiting.
    [P2] Producing value : 31
    [P2] Producing value : 32
    [P1] Producing value : 33
    [P2] Producing value : 34
    [C1] Consuming value : 31
    [C1] Consuming value : 32
    [C1] Consuming value : 33
    [C1] Consuming value : 34
    [P2] Producing value : 35
    [P2] Producing value : 36
    [P1] Producing value : 37
    [P2] Producing value : 38
    [C1] Consuming value : 35
    [C1] Consuming value : 36
    [P2] Producing value : 39
    [P1] Producing value : 40
    [P2] Producing value : 41
    [C1] Consuming value : 37
    [P2] Producing value : 42
    Queue is full, [P2] thread waiting.
    Queue is full, [P1] thread waiting.
    运行结果
    注意要点

    判断Queue大小为0或者大于等于queueSize时须使用 while (condition) {},不能使用 if(condition) {}。其中 while(condition)循环,它又被叫做“自旋锁”。为防止该线程没有收到notify()调用也从wait()中返回(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。

    2. 使用Lock和Condition的await() / signal()方法

    在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。Condition接口的await()signal()就是其中用来做同步的两种方法,它们的功能基本上和Object的wait()nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。下面来看代码:

    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ProducerConsumer {
    
        private final Lock lock = new ReentrantLock();
    
        private final Condition addCondition = lock.newCondition();
    
        private final Condition subCondition = lock.newCondition();
    
    
        private int num = 0;
        private List<Integer> lists = new LinkedList<Integer>();
    
        public void add() {
            lock.lock();
            try {
                while (lists.size() == 10) {//当集合已满,则"添加"线程等待
                    System.out.println(Thread.currentThread().getName()+" list is full,await...");
                    addCondition.await();
                }
                num++;
                lists.add( num);
                System.out.println(Thread.currentThread().getName()+" produce num:"+num);
    
                this.subCondition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {//释放锁
                lock.unlock();
            }
        }
    
        public void sub() {
            lock.lock();
            try {
                while (lists.size() == 0) {//当集合为空时,"减少"线程等待
                    System.out.println(Thread.currentThread().getName()+" list is empty,await...");
                    subCondition.await();
                }
                Integer num = lists.remove(0);
                System.out.println(Thread.currentThread().getName()+" consume num:"+ num);
                addCondition.signal();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    }
    ProducerConsumer
    import lombok.SneakyThrows;
    
    import java.util.Random;
    
    public class Test {
    
        public static void main(String[] args) {
            ProducerConsumer task = new ProducerConsumer();
            Thread t1 = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true) {
                        Thread.sleep(new Random().nextInt(1000));
                        task.add();
                    }
                }
            });
            t1.setName("P1");
            Thread t2 = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true) {
                        Thread.sleep(new Random().nextInt(1000));
                        task.add();
                    }
                }
            });
            t2.setName("P2");
            Thread t3 = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true) {
                        Thread.sleep(new Random().nextInt(1000));
                        task.sub();
                    }
                }
            });
            t3.setName("C1");
    
            t1.start();
            t2.start();
            t3.start();
    
        }
    
    }
    测试类
    P2 produce num:1
    C1 consume num:1
    P1 produce num:2
    C1 consume num:2
    P1 produce num:3
    P1 produce num:4
    P2 produce num:5
    C1 consume num:3
    P2 produce num:6
    C1 consume num:4
    P1 produce num:7
    P1 produce num:8
    P1 produce num:9
    P2 produce num:10
    C1 consume num:5
    P1 produce num:11
    C1 consume num:6
    C1 consume num:7
    P1 produce num:12
    P1 produce num:13
    P2 produce num:14
    P2 produce num:15
    C1 consume num:8
    P1 produce num:16
    P1 produce num:17
    P2 produce num:18
    C1 consume num:9
    P1 produce num:19
    P2 list is full,await...
    P1 list is full,await...
    C1 consume num:10
    P2 produce num:20
    C1 consume num:11
    P1 produce num:21
    P2 list is full,await...
    C1 consume num:12
    P2 produce num:22
    P1 list is full,await...
    P2 list is full,await...
    C1 consume num:13
    P1 produce num:23
    C1 consume num:14
    P2 produce num:24
    P1 list is full,await...
    测试结果

    3. 使用BlockingQueue阻塞队列方法

    JDK 1.5 以后新增的 java.util.concurrent包新增了 BlockingQueue 接口。并提供了如下几种阻塞队列实现:

    • java.util.concurrent.ArrayBlockingQueue
    • java.util.concurrent.LinkedBlockingQueue
    • java.util.concurrent.SynchronousQueue
    • java.util.concurrent.PriorityBlockingQueue

    实现生产者-消费者模型使用 ArrayBlockingQueue或者 LinkedBlockingQueue即可。

    我们这里使用LinkedBlockingQueue,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。

    • put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
    • take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

    我们可以跟进源码看一下LinkedBlockingQueue类的put()方法实现:

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();
    
    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();
    
    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();
    
    
    public void put(E e) throws InterruptedException {
        putLast(e);
    }
    
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    View Code

    看到这里证实了它的实现方式采用的是我们第2种await()signal()方法。下面我们就使用它实现吧。

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ProducerConsumerBQ {
        private BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
         AtomicInteger num=new AtomicInteger(0);
        public void add() throws InterruptedException {
            System.out.println(Thread.currentThread().getName()+" produce num:"+num.get());
            blockingQueue.put(num.getAndIncrement());
    
        }
    
        public void sub() throws InterruptedException {
            Integer num2 = blockingQueue.take();
            System.out.println(Thread.currentThread().getName()+" consume num:"+ num2);
        }
    
    }
    ProducerConsumerBQ
    import lombok.SneakyThrows;
    
    import java.util.Random;
    
    public class Test {
    
        public static void main(String[] args) {
            ProducerConsumerBQ task = new ProducerConsumerBQ();
            Thread t1 = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true) {
                        Thread.sleep(new Random().nextInt(1000));
                        task.add();
                    }
                }
            });
            t1.setName("P1");
            Thread t2 = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true) {
                        Thread.sleep(new Random().nextInt(1000));
                        task.add();
                    }
                }
            });
            t2.setName("P2");
            Thread t3 = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    while (true) {
                        Thread.sleep(new Random().nextInt(1000));
                        task.sub();
                    }
                }
            });
            t3.setName("C1");
    
            t1.start();
            t2.start();
            t3.start();
    
        }
    
    }
    测试类
  • 相关阅读:
    如何找出一个数组中第二大的数
    如何把一个整型数组中重复的数字去掉
    假设数组a有n个元素,元素取值范围是1~n,如何判定数组是否存在重复元素
    如何重新排列数组使得数组左边为奇数,右边为偶数,并使得空间复杂度为O(1),时间复杂度为O(n)
    给一个由n-1个整数组成的未排序的序列,其元素都是1~n中的不同的整数。如何在线性时间复杂度内寻找序列中缺失的整数
    已知大小分别为m、n的两个无序数组A、B和一个常数c,求满足A[i]+B[j]=c的所有A[i]和B[j]
    如何找出数组中符合条件的数对
    如何找出数组中出现奇数次的元素
    云服务器ECS挖矿木马病毒处理和解决方案
    Java下载https文件上传到阿里云oss服务器
  • 原文地址:https://www.cnblogs.com/jvStarBlog/p/13697881.html
Copyright © 2011-2022 走看看