zoukankan      html  css  js  c++  java
  • 生产者消费者模式(转)

    原文地址:http://www.cnblogs.com/luxh/p/3300956.html

    第一种实现方式:

    package com.rainy.pools.threads;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Demo {
    
        final Lock lock = new ReentrantLock();
        final Condition condition = lock.newCondition();
    
        public static void main(String[] args) {
            Demo test = new Demo();
            Producer producer = test.new Producer();
            Consumer consumer = test.new Consumer();
    
    
            consumer.start();
            producer.start();
        }
    
        class Consumer extends Thread{
    
            @Override
            public void run() {
                consume();
            }
    
            private void consume() {
    
                try {
                    lock.lock();
                    System.out.println("我在等一个新信号"+this.currentThread().getName());
                    condition.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    System.out.println("拿到一个信号"+this.currentThread().getName());
                    lock.unlock();
                }
    
            }
        }
    
        class Producer extends Thread{
    
            @Override
            public void run() {
                produce();
            }
    
            private void produce() {
                try {
                    lock.lock();
                    System.out.println("我拿到锁"+this.currentThread().getName());
                    condition.signalAll();
                    System.out.println("我发出了一个信号:"+this.currentThread().getName());
                } finally{
                    lock.unlock();
                }
            }
        }
    
    }

    第二种:

    package com.rainy.pools.threads;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class BlockingDemo {
    
        Executor pool = Executors.newFixedThreadPool(10);
    
        //仓库
        private BlockingQueue<String> storageQueue = new LinkedBlockingQueue<String>(5);
    
        //仓库容量
        private int MAX_SIZE = 3;
    
        //仓库为空
        private int ZERO = 0;
    
        //生产者线程
        private class Producer implements Runnable{
    
            //生产方法,需同步
            private void produce(){
                try {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备生产!");
                    if(storageQueue.size()==MAX_SIZE) {
                        System.out.println("仓库已满!等待消费者消费");
                        Thread.sleep(1000);
                    }
                    if(storageQueue.size()<=MAX_SIZE) {
                        String product = "产品"+new Random().nextInt();
                        storageQueue.put(product);
                        System.out.println(Thread.currentThread().getName()+"往仓库中生产了一个产品!");
                    }
                    Thread.sleep(1000);
                }catch(InterruptedException ie) {
                    System.out.println("中断异常");
                    ie.printStackTrace();
                }
            }
    
            public void run() {
                while(true) {
                    produce();
                }
            }
        }
    
        //消费者线程
        private class Customer implements Runnable{
    
            //消费方法,需同步
            private void consume() {
                try {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备消费!");
                    if(storageQueue.size()==ZERO) {
                        System.out.println("仓库已空!等待生产者生产");
                        Thread.sleep(1000);
                    }
                    if(storageQueue.size()!=ZERO) {
                        System.out.println(Thread.currentThread().getName()+"从仓库取得产品:"+storageQueue.take());
                    }
                    Thread.sleep(1000);
                }catch(InterruptedException ie) {
                    System.out.println("中断异常");
                    ie.printStackTrace();
                }
            }
    
            public void run() {
                while(true) {
                    consume();
                }
            }
    
        }
    
        //启动生产者和消费者线程
        public void start() {
            for(int i=1;i<5;i++) {
                //new Thread(new Producer()).start();
                ///new Thread(new Customer()).start();
                pool.execute(new Producer());
                pool.execute(new Customer());
            }
    
        }
    
        public static void main(String[] args) {
            BlockingDemo pc = new BlockingDemo();
            pc.start();
        }
    
    }

    第三种:

    package com.rainy.pools.threads;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConcurrentDemo {
    
        Executor pool = Executors.newFixedThreadPool(10);
    
        //仓库
        private List<String> storageList = new LinkedList<String>();
    
        //仓库容量
        private int MAX_SIZE = 3;
    
        //仓库为空
        private int ZERO = 0;
    
        //获取锁对象
        private Lock lock = new ReentrantLock();
    
        //仓库满了,绑定生产者线程
        private Condition full = lock.newCondition();
    
        //仓库为空,绑定消费者线程
        private Condition empty = lock.newCondition();
    
        //生产者线程
        private class Producer implements Runnable{
    
            //生产方法,需同步
            private void produce(){
                if(lock.tryLock()) {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备生产!");
                    try {
                        if(storageList.size()==MAX_SIZE) {
                            System.out.println("仓库已满!等待消费者消费");
                            Thread.sleep(1000);
                            full.await();//生产者线程加入线程等待池
                        }
                        if(storageList.size()<MAX_SIZE){
                            String name = "产品"+new Random().nextInt();
                            storageList.add(name);
                            System.out.println(Thread.currentThread().getName()+"往仓库中生产了一个产品!");
                        }
                        Thread.sleep(1000);
                        System.out.println("发送唤醒消费者信号");
                        empty.signalAll();//唤醒消费者线程
    
                    }catch(InterruptedException ie) {
                        System.out.println("中断异常");
                        ie.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
    
            public void run() {
                while(true) {
                    produce();
                }
            }
        }
    
        //消费者线程
        private class Customer implements Runnable{
    
            //消费方法,需同步
            private void consume() {
                if(lock.tryLock()) {
                    System.out.println(Thread.currentThread().getName()+"进入仓库,准备消费!");
                    try {
                        if(storageList.size()==ZERO) {
                            System.out.println("仓库已空!等待生产者生产");
                            Thread.sleep(1000);
                            empty.await();//消费者线程加入线程等待池
                        }
                        if(storageList.size()!=ZERO) {
                            System.out.println(Thread.currentThread().getName()+"从仓库取得产品:"+storageList.remove(0));
                        }
    
                        Thread.sleep(1000);
                        System.out.println("发送唤醒生产者信号");
                        full.signalAll();//唤醒生产者线程
                    }catch(InterruptedException ie) {
                        System.out.println("中断异常");
                        ie.printStackTrace();
                    }finally{
                        lock.unlock();
                    }
                }
            }
    
            public void run() {
                while(true) {
                    consume();
                }
            }
    
        }
    
        //启动生产者和消费者线程
        public void start() {
            for(int i=1;i<5;i++) {
                //new Thread(new Producer()).start();
                //new Thread(new Customer()).start();
                pool.execute(new Producer());
                pool.execute(new Customer());
            }
    
        }
    
        public static void main(String[] args) {
            ConcurrentDemo pc = new ConcurrentDemo();
            pc.start();
        }
    
    }

    自测Demo:

    package com.rainy.pools.threads;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Pools {
    
        protected Condition notEmpty;
        protected Condition empty;
        protected ReentrantLock lock;
        private int poolingCount = 0;
    
        public Pools () {
            lock = new ReentrantLock(false);
            notEmpty = lock.newCondition();
            empty = lock.newCondition();
        }
    
        public void get() {
            GetThread get = new GetThread();
            get.start();
        }
    
        public void set() {
            SetThread set = new SetThread();
            set.start();
        }
    
        public class SetThread extends Thread {
            public void run() {
                while(true){
                    set();
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            public void set() {
                if(lock.tryLock()) {
                    try {
                        System.out.println("set poolingCount : " + poolingCount);
                        if (poolingCount > 10) {
                            System.out.println("仓库满");
                            notEmpty.await();
                        }
                        poolingCount++;
                        System.out.println("set poolingCount : " + poolingCount);
                    } catch (InterruptedException e) {
                        System.out.println("set InterruptedException ");
                        e.printStackTrace();
                    } finally {
                        empty.signal();
                    }
                    System.out.println("set poolingCount : " + poolingCount);
                    System.out.println();
                    System.out.println();
                    lock.unlock();
                }
            }
        }
    
        public class GetThread extends Thread {
            public void run() {
                while(true){
                    get();
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            public void get() {
                // tryLock()
                // lock()
                // lockInterruptibly()
                // 方法的区别:
                /**
                    (*) lock()方法获取锁。
                    如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
                    如果当前线程已经保持该锁,则将保持计数加 1,并且该方法立即返回。
                    如果该锁被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。
    
                    (*) lockInterruptibly()方法获取锁。
                    1) 如果当前线程未被中断,则获取锁。
                    2)如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
                    3)如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
                    4)如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:
                        1)锁由当前线程获得;或者
                        2)其他某个线程中断当前线程。
                    5)如果当前线程获得该锁,则将锁保持计数设置为 1。
                    如果当前线程:
                        1)在进入此方法时已经设置了该线程的中断状态;或者
                        2)在等待获取锁的同时被中断。
                        则抛出 InterruptedException,并且清除当前线程的已中断状态。
                    6)在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。
                        指定者: 接口 Lock 中的 lockInterruptibly
                        抛出:   InterruptedException   如果当前线程已中断。
    
                    (*) tryLock()方法获取锁。
                    仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
                    1)如果该锁没有被另一个线程保持,并且立即返回 true 值,则将锁的保持计数设置为 1。
                        即使已将此锁设置为使用公平排序策略,但是调用 tryLock() 仍将 立即获取锁(如果有可用的),
                        而不管其他线程当前是否正在等待该锁。在某些情况下,此“闯入”行为可能很有用,即使它会打破公
                        平性也如此。如果希望遵守此锁的公平设置,则使用 tryLock(0, TimeUnit.SECONDS)
                        ,它几乎是等效的(也检测中断)。
                    2)如果当前线程已经保持此锁,则将保持计数加 1,该方法将返回 true。
                    3)如果锁被另一个线程保持,则此方法将立即返回 false 值。
                    指定者:
                        接口 Lock 中的  tryLock
                        返回:
                        如果锁是自由的并且被当前线程获取,或者当前线程已经保持该锁,则返回true;否则返回false
    
                    总结:
                         1)lock(), 拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。
                         2)tryLock(),马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。
                         3)lockInterruptibly()lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。
                            并且如果线程已经被interrupt,再使用lockInterruptibly的时候,此线程也会被要求处理interruptedException
                 */
                if(lock.tryLock()) {
                    try {
                        System.out.println("get poolingCount : " + poolingCount);
                        if (0 == poolingCount) {
                            System.out.println("仓库空");
                            empty.await();
                        }
                    } catch (InterruptedException e) {
                        System.out.println("get InterruptedException ");
                        e.printStackTrace();
                    } finally {
                        notEmpty.signal();
                    }
    
                    System.out.println("get poolingCount : " + poolingCount);
                    poolingCount--;
                    System.out.println("get poolingCount : " + poolingCount);
                    System.out.println();
                    System.out.println();
    
                    lock.unlock();
                }
            }
        }
    
        public static void main(String[] args) {
            Pools p = new Pools();
            p.set();
            p.get();
        }
    
    }
  • 相关阅读:
    Easy Climb UVA
    POJ 2823 滑动窗口 单调队列模板
    Feel Good
    Problem J. Joseph’s Problem 约瑟夫问题--余数之和
    hdu 1029 Ignatius and the Princess IV
    poj 1027 Ignatius and the Princess II全排列
    Problem C Updating a Dictionary
    hdu 1412 {A}+{B}
    hdu 4006 The kth great number
    实现:职工管理系统
  • 原文地址:https://www.cnblogs.com/rainy-shurun/p/5764861.html
Copyright © 2011-2022 走看看