zoukankan      html  css  js  c++  java
  • 生产者消费者

    Java实现生产者-消费者模型

    基本概念

    生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

    阻塞队列代码

    就是使用LinkedBlockingQueue,其中的 put()   take()方法 是阻塞的。

    public class ProdConsum2 {
    
        private static final int NUM = 10;
    
        private static LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(NUM);
    
        public static void main(String[] args) throws Exception {
            Thread putThread = new Thread(new PutClass());
            Thread getThread = new Thread(new GetClass());
            putThread.start();
            getThread.start();
    
            Thread.sleep(100000);
        }
    
        public static class GetClass implements Runnable {
    
            @Override
            public void run() {
                while(true) {
                    try {
                        long curTime = queue.take();
                        System.out.println("取出:" + curTime + " 待取元素:" + queue.size());
                        Thread.sleep(1000);
                    } catch (Exception e) {
    
                    }
                }
            }
        }
    
        public static class PutClass implements Runnable {
    
            @Override
            public void run() {
                while(true) {
                    try {
                        long curTime = System.currentTimeMillis();
                        queue.put(curTime);
                        System.out.println("插入:" + System.currentTimeMillis() + " 剩余空间:" + (NUM - queue.size()));
                        Thread.sleep(500);
                    } catch (Exception e) {
    
                    }
                }
            }
        }
    }

    1个Lock 2个Condition

    相同于一个lock控制生成、消费流程。

    package A_newStart.listNode.test;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ProdConsum3 {
    
        private static final int NUM = 10;
    
        private LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(NUM);
    
        private ReentrantLock lock = new ReentrantLock();
    
        private Condition notEmpty = lock.newCondition();
    
        private Condition notFull = lock.newCondition();
    
        public static void main(String[] args) throws Exception {
            ProdConsum3 aa = new ProdConsum3();
            Thread putThread1 = new Thread(aa.new Producer("produce111"));
            Thread putThread2 = new Thread(aa.new Producer("produce222"));
            Thread getThread1 = new Thread(aa.new Consumer("consumer111"));
            Thread getThread2 = new Thread(aa.new Consumer("consumer222"));
            putThread1.start();
            putThread2.start();
            getThread1.start();
            getThread2.start();
    
            Thread.sleep(100000);
        }
    
        public class Producer implements Runnable {
    
            private String name;
    
            public Producer(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                while(true) {
                    //获取锁
                    lock.lock();
                    try {
                        while (queue.size() >= NUM) {
                            notFull.await();
                        }
                        long curTime = System.currentTimeMillis();
                        queue.put(curTime);
                        System.out.println(name + " 插入:" + curTime + " size:" + queue.size());
                        notEmpty.signal();
                    } catch (Exception e) {
    
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    
        public class Consumer implements Runnable {
    
            private String name;
    
            public Consumer(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                while(true) {
                    //获取锁
                    lock.lock();
                    try {
                        while (queue.size() <= 0) {
                            notEmpty.await();
                        }
                        long curTime = queue.take();
                        System.out.println(name + " 获取:" + curTime + " size:" + queue.size());
                        notFull.signal();
                        Thread.sleep(100);
                    } catch (Exception e) {
    
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    }

    2个Lock 2个Condition

    与上面1个Lock 2个Condition相比,生成、消费的过程可以并行。

    package A_newStart.listNode.test;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ProdConsum4 {
    
        private static final int NUM = 10;
    
        private LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(NUM);
    
        private ReentrantLock putLock = new ReentrantLock();
        private Condition notFull = putLock.newCondition();
    
        private ReentrantLock getLock = new ReentrantLock();
        private Condition notEmpty = getLock.newCondition();
    
        private AtomicInteger count = new AtomicInteger(0);//队列中产品计数器
    
        public static void main(String[] args) throws Exception {
            ProdConsum4 aa = new ProdConsum4();
            Thread putThread1 = new Thread(aa.new Producer("produce111"));
            Thread putThread2 = new Thread(aa.new Producer("produce222"));
            Thread getThread1 = new Thread(aa.new Consumer("consumer111"));
            Thread getThread2 = new Thread(aa.new Consumer("consumer222"));
            putThread1.start();
            putThread2.start();
            getThread1.start();
            getThread2.start();
    
            Thread.sleep(100000);
        }
    
        public class Producer implements Runnable {
    
            private String name;
    
            public Producer(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                while(true) {
                    int newCount = -1;
                    //获取锁
                    putLock.lock();
                    try {
                        while (count.get() >= NUM) {
                            notFull.await();
                        }
                        long curTime = System.currentTimeMillis();
                        queue.put(curTime);
                        System.out.println(name + " 插入:" + curTime + " size:" + queue.size());
                        newCount = count.incrementAndGet();
                        if (newCount < NUM) {
                            notFull.signalAll();
                        }
                    } catch (Exception e) {
    
                    } finally {
                        putLock.unlock();
                    }
    
                    if (newCount > 0) {
                        getLock.lock();
                        try {
                            notEmpty.signalAll();
                        } catch (Exception e) {
    
                        } finally {
                            getLock.unlock();
                        }
                    }
                }
            }
        }
    
        public class Consumer implements Runnable {
    
            private String name;
    
            public Consumer(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                while(true) {
                    int newCount = -1;
                    //获取锁
                    getLock.lock();
                    try {
                        while (count.get() <= 0) {
                            notEmpty.await();
                        }
                        long curTime = queue.take();
                        System.out.println(name + " 获取:" + curTime + " size:" + queue.size());
                        newCount = count.decrementAndGet();
                        if (newCount > 0) {
                            notEmpty.signalAll();
                        }
                        Thread.sleep(100);
                    } catch (Exception e) {
    
                    } finally {
                        getLock.unlock();
                    }
    
                    if (newCount <= 0) {
                        putLock.lock();
                        try {
                            notFull.signalAll();
                        } catch (Exception e) {
    
                        } finally {
                            putLock.unlock();
                        }
                    }
                }
            }
        }
    }
  • 相关阅读:
    快速创建MockAPI
    Eolinker SaaS 7.5 版本更新
    【翻译】几个优质的REST API工具
    建立RESTful API测试程序的基础
    Ubuntu下gcc安装及使用
    c++转化成delphi的代码
    VCL组件的属性和方法详解
    Delphi组件开发教程指南目录
    FASM 第一章 简介
    (一)SQL 基础知识
  • 原文地址:https://www.cnblogs.com/fanguangdexiaoyuer/p/12362311.html
Copyright © 2011-2022 走看看