zoukankan      html  css  js  c++  java
  • 生产者消费者模型实现方案

    生产者消费者模型实现方案

    版本1:无通信的生产消费模型

    /**
        无通信的生产者消费者模型
     */
    public class PCVersion1 {
    
        private final static Random random = new Random(System.currentTimeMillis());
        private int i = 1;
        final private Object object = new Object();
    
        public void produce() {
            synchronized (object) {
                System.out.println("prodcuer: " + (++i));
    
            }
        }
    
        public void consumer() {
            synchronized (object) {
                System.out.println("consumer: " + (--i));
            }
        }
    	/**
    	prodcuer: 2
    	consumer: 1
    	consumer: 0
    	prodcuer: 1
    	prodcuer: 2
    	consumer: 1
    	输出很和谐,谁抢到锁,谁就执行
    	**/
        public static void main(String[] args) {
            PCVersion1 pc = new PCVersion1();
            new Thread(() -> {
                while (true) {
                    pc.produce();
                    try {
                        TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
            new Thread(() -> {
                while (true) {
                    pc.consumer();
                    try {
                        TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
        }
    }
    
    

    版本2:使用if导致临界变量出现问题生产消费模型

    /**
        有通信的生产者消费者模型
        但是在多个生产者与消费者情况会出现问题
        1.由于判断时使用if而没有使用while连续消费
         c1: -3
         c2: -4
        其并不是生产一个消费一个
     */
    public class PCVersion2 {
    
        private final static Random random = new Random(System.currentTimeMillis());
        private int i = 0;
        final private Object object = new Object();
        private boolean isProduce = false;
    
        public void produce() {
            synchronized (object) {
                //且这里不应该适用if
                if (isProduce){
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() +": " + (++i));
                isProduce = true;
                object.notifyAll();
            }
        }
    
        public void consumer() {
            synchronized (object) {
                //比如c1,c2都在等待,那么p1生产了,会通知,此时c1拿到锁,不判断消费一个,然后又去唤醒
                //如果此时唤醒的是c2的话,那么它根本不会去执行if(!isProduce),所以会出现c2又继续消费的情况
                if (!isProduce){
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() +": "+ (--i));
                isProduce = false;
                object.notifyAll();
            }
        }
    
        public static void main(String[] args) {
            PCVersion2 pc = new PCVersion2();
            new Thread(() -> {
                while (true) {
                    pc.produce();
                    try {
                        TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                    }
                }
            },"p1").start();
            new Thread(() -> {
                while (true) {
                    pc.consumer();
                    try {
                        TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                    }
                }
            },"c1").start();
            new Thread(() -> {
                while (true) {
                    pc.consumer();
                    try {
                        TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                    }
                }
            },"c2").start();
        }
    }
    

    版本3:使用notify导致虚假唤醒导致死锁生产消费模型

    /**
        有通信的生产者消费者模型
        但是在多个生产者与消费者情况会出现问题
        1.由于使用notify而没有使用notifyAll可能产生死锁
        如两个生产者线程A,B和两个消费线程C,D
        A:生产完后,进入wait,然后B又生产,进入wait:
        C:消费完后将isProduce设置为false,通知,此时D拿到执行CPU,D会判断,发现其必须wait了
        此时所有的线程堵在wait,没有能够将其唤醒的线程了,进入了死锁
     */
    public class PCVersion3 {
    
        private final static Random random = new Random(System.currentTimeMillis());
        private int i = 0;
        final private Object object = new Object();
        private boolean isProduce = false;
    
        public void produce() {
            synchronized (object) {
                //且这里不应该适用if
                while (isProduce){
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() +": " + (++i));
                isProduce = true;
                object.notify();
            }
        }
    
        public void consumer() {
            synchronized (object) {
                while (!isProduce){
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() +": "+ (--i));
                isProduce = false;
                object.notify();
            }
        }
    
        public static void main(String[] args) {
            PCVersion3 pc = new PCVersion3();
            new Thread(() -> {
                while (true) {
                    pc.produce();
                }
            },"p1").start();
            new Thread(() -> {
                while (true) {
                    pc.produce();
                }
            },"p2").start();
            new Thread(() -> {
                while (true) {
                    pc.consumer();
                }
            },"c1").start();
            new Thread(() -> {
                while (true) {
                    pc.consumer();
                }
            },"c2").start();
        }
    }
    

    版本4:使用wait,notifyAll生产消费模型

    /**
        有通信的生产者消费者模型
        多个生产者与消费者情况
        判断临界值使用while而不使用
        不要使用notify而应使用notifyAll
     */
    public class PCVersion4 {
        private volatile int i = 0;
        final private Object object = new Object();
    
        private volatile boolean isProduce = false;
    
        public void produce() {
            synchronized (object) {
                //使用while
                while (isProduce) {
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() +": " + (++i));
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //使用notifyAll
                object.notifyAll();
                isProduce = true;
            }
        }
    
        public void consumer() {
            synchronized (object) {
                while (!isProduce) {
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() +": "+ (--i));
                object.notifyAll();
                isProduce = false;
            }
        }
    
        public static void main(String[] args) {
            PCVersion4 pc = new PCVersion4();
            new Thread("P1"){
                @Override
                public void run() {
                    while (true) {
                        pc.produce();
                    }
                }
            }.start();
            new Thread("P2"){
                @Override
                public void run() {
                    while (true) {
                        pc.produce();
                    }
                }
            }.start();
            new Thread("C1"){
                @Override
                public void run() {
                    while (true) {
                        pc.consumer();
                    }
                }
            }.start();
            new Thread("C2"){
                @Override
                public void run() {
                    while (true) {
                        pc.consumer();
                    }
                }
            }.start();
            new Thread("C3"){
                @Override
                public void run() {
                    while (true) {
                        pc.consumer();
                    }
                }
            }.start();
        }
    }
    

    版本5:使用Lock,Condition生产消费模型

    public class Condition2 {
    
        private final static Lock lock = new ReentrantLock();
    
        private final static Condition producerCondition = lock.newCondition();
    
        private final static Condition consumerCondition = lock.newCondition();
    
        private final static LinkedList<Long> list = new LinkedList<>();
    
        private final static int MAX_CAPACITY = 100;
    
        private static void producer() {
            try {
                lock.lock();
                while (list.size() >= MAX_CAPACITY) {
                    producerCondition.await();
                }
                long time = System.currentTimeMillis();
                System.out.println(Thread.currentThread().getName() + " 生产=> " + time);
                list.addLast(time);
                consumerCondition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        private static void consumer() {
            try {
                lock.lock();
                while (list.size() <= 0) {
                    consumerCondition.await();
                }
                long time = list.removeFirst();
                System.out.println(Thread.currentThread().getName() + " 消费 => " + time);
                producerCondition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        private static void sleep(long time) {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                for (;;) {
                    producer();
                    sleep(20);
                }
            },"P1").start();
            new Thread(() -> {
                for (;;) {
                    producer();
                    sleep(20);
                }
            },"P2").start();
            new Thread(() -> {
                for (;;) {
                    producer();
                    sleep(20);
                }
            },"P3").start();
            new Thread(() -> {
                for (;;) {
                    consumer();
                }
            },"C2").start();
        }
    }
    

    版本6:使用自定义队列生产消费模型

    public class MessageQueue {
    
        private final LinkedList<Message> queue;
    
        private final static Object LOCK = new Object();
    
        private final static int DEFAULT_MAX_LIMIT = 100;
    
        private final int limit;
    
        public MessageQueue() {
            this(DEFAULT_MAX_LIMIT);
        }
    
        public MessageQueue(int limit) {
            this.limit = limit;
            this.queue = new LinkedList<>();
        }
    
        public void put(Message message) throws InterruptedException {
            synchronized (LOCK) {
                while (queue.size() > limit) {
                    LOCK.wait();
                }
                queue.addLast(message);
                LOCK.notifyAll();
            }
        }
    
        public Message take() throws InterruptedException {
            synchronized (LOCK) {
                while (queue.isEmpty()) {
                    queue.wait();
                }
                Message message = queue.removeFirst();
                LOCK.notifyAll();
                return message;
            }
        }
    
        public int getMaxLimit() {
            return this.limit;
        }
    
        public int getMessageSize() {
            synchronized (queue) {
                return this.queue.size();
            }
        }
    }
    
  • 相关阅读:
    泛型
    Webx示例-PetStore分析1
    Spring容器简介
    PostgreSQL配置文件--复制
    PostgreSQL配置文件--WAL
    PostgreSQL配置文件--资源使用(除WAL外)
    PostgreSQL配置文件--连接和认证
    postgres访问认证配置文件pg_hba.conf
    lykops运维自动化
    DBA不可不知的操作系统内核参数
  • 原文地址:https://www.cnblogs.com/liuligang/p/10588484.html
Copyright © 2011-2022 走看看