zoukankan      html  css  js  c++  java
  • 《图解Java多线程设计模式》之六:Producer-Consumer模式

    一,Producer-Consumer模式

    Producer:生产者的意思,指的是生成数据的线程。
    Consumer:消费者的意思,指的是使用数据的线程
    当生产者和消费者以不同的线程运行时,两者之间的处理速度差异就会引起问题。比如,消费者想获取数据,可是数据还没有生成。
    或者生产者想要交付数据,而消费者的状态还无法接收数据。
    Producer-Consumer模式在生产者消费者之间加入了一个桥梁角色。该桥梁角色用于消除线程间处理速度的差异。
    在该模式中,生产者和消费者都有多个,当消费者和生产者都只有一个时,称之为Pipe模式

    二,例子

    /**
     * Table类用于表示放置蛋糕的桌子.
     * 存放蛋糕的容器是数组。所以放置蛋糕是有顺序的,拿取蛋糕也是有顺序的。
     */
    public class Table {
        private final String[] buffer; //实际存放蛋糕的容器
        private int tail;//下次put的位置
        private int head;//下次take的位置
        private int count;//buffer中蛋糕的个数
    
        public Table(int count) {
            this.buffer = new String[count];
            this.tail = 0;
            this.head = 0;
            this.count = 0;
        }
        //放置蛋糕
        public synchronized void put(String cake)throws InterruptedException{
            System.out.println(Thread.currentThread().getName()+" put "+cake);
            while (count >= buffer.length){
                wait();
            }
            buffer[tail] = cake;
            tail = (tail+1)%buffer.length;
            count++;
            notifyAll();
        }
    
        //拿取蛋糕
        public synchronized String take()throws InterruptedException{
            while (count <= 0){
                wait();
            }
            String cake = buffer[head];
            head = (head+1)%buffer.length;
            count--;
            notifyAll();
            System.out.println(Thread.currentThread().getName()+" takes "+cake);
            return cake;
        }
    }
    public class MakerThread extends Thread {
        private final Random random;
        private final Table table;
        private static int id =0;//蛋糕的流水号
    
        public MakerThread(String name, Table table, long seed) {
            super(name);
            this.random = new Random(seed);
            this.table = table;
        }
    
        @Override
        public void run() {
            try {
                while (true){
                    Thread.sleep(random.nextInt(1000));
                    String cake = "[ Cake NO."+nextId() +" by "+getName()+"]";
                    table.put(cake);
                }
            }catch (InterruptedException e){
    
            }
        }
        private static synchronized int nextId(){
            return id++;
        }
    }
    public class EaterThread extends Thread {
        private final Random random;
        private final Table table;
    
        public EaterThread(String name, Table table, long seed) {
            super(name);
            this.random = new Random(seed);
            this.table = table;
        }
    
        @Override
        public void run() {
            try {
                while (true){
                    String cake = table.take();
                    Thread.sleep(random.nextInt(1000));
                }
            }catch (InterruptedException e){
            }
        }
    }
    public class Test {
        public static void main(String[] args) {
            Table table = new Table(3);
            new MakerThread("MakerThread-1",table,31415).start();
            new MakerThread("MakerThread-2",table,92653).start();
            new MakerThread("MakerThread-3",table,58979).start();
    
            new EaterThread("EaterThread-1",table,32384).start();
            new EaterThread("EaterThread-2",table,62643).start();
            new EaterThread("EaterThread-3",table,38327).start();
    
        }
    }

    三,InterruptedException异常

    1.加了InterruptedException 的方法
    java.lang.Object类的wait方法
    java.lang.Thread类的sleep方法
    java.lang.Thread类的的join方法
    2.加了 throws InterruptedException 的方法可能会花费时间,但是可以取消
    花费时间体现在:
    执行wait方法,需要等待notify/notifyAll方法唤醒,需要花费时间
    执行sleep方法,会暂停执行,这也花费时间
    执行join方法,会等待指定线程终止,需要时间
    可以取消体现在:
    假如Alice线程执行了Thread.sleep(100000);我们可以在别的线程中使用 alice.interrupt();当执行了interrupt后,正在sleep的线程会终止暂停状态,alice线程抛出
    InterruptedException异常。这样线程Alice的控制权就会转移到捕获该异常的catch语句块中。
    3.interrupt方法只是改变了中断状态
    上面调用interrupt后,线程抛出InterruptedException异常,只是因为alice线程调用了线程中断的方法(wait,sleep,join)。
    当线程执行普通的逻辑处理时,即使别的线程调用alice.interrupt(); 也不会抛出异常,而是继续执行。只有当线程继续执行到sleep,wait,join等方法的调用时,
    才会抛出InterruptedException异常
    4,其他线程执行alice.interrupt()时,并不需要获取alice线程实例的锁,无论何时,任何线程都可以调用其他线程的interrupt方法
    5.notify和interrupt的区别:
    ??????????

    四,isInterrupted方法:检查中断状态

    isInterrupted是Thread类的实例方法,用于检查指定线程的中断状态,
    若指定线程处于中断状态:返回true,
    未处于中断状态:返回false

    五,Thread.Interrupted:检查并清除中断状态

    该方法是Thread类的静态方法,用于检查并清除当前线程的中断状态(操作对象是线程本身)
    若当前线程处于中断状态,返回true。并且当前线程的中断状态会被清楚。
    若当前线程未处于中断状态,返回false。

    六,java.util.concurrent包中的队列

    1.BlockingQueue接口:阻塞队列
    该接口表示在达到合适的状态之前线程一直阻塞(wait)的队列。实现类:
    1).ArrayBlockingQueue类:基于数组的 BlockingQueue
    元素个数有限的BlockingQueue
    2).LinkedBlockingQueue类:基于链表的BlockingQueue
    元素个数没有最大限制的,只要有内存,就可以一直put数据
    3).PriorityBlockingQueue类:带有优先级的BlockingQueue
    数据的优先级是根据Comparable接口的自然排序,或构造函数的Comparator接口决定的顺序指定的
    4).SynchronousQueue类:直接传递的BlockingQueue
    该类用于执行由Producer角色到Consumer角色的 直接传递。如果Producer角色先put,在Consumer角色take之前,Producer角色的线程将一直阻塞。
    反之一样阻塞
    2.ConcurrentLinkedQueue类:元素个数没有最大限制的线程安全队列
    ConcurrentLinkedQueue中,内部数据结构是分开的,线程之间互不影响,所以就不需要进行互斥处理
    3.使用ArrayBlockingQueue替代上面实例程序中的Table类
    代码:

    /**
     
     * 使用阻塞队列完成table的功能
     */
    public class BlockQueueTable extends ArrayBlockingQueue<String>{
        public BlockQueueTable(int capacity) {
            super(capacity);
        }
        public void put(String cake) throws InterruptedException {
            System.out.println(Thread.currentThread().getName()+" puts "+cake);
            super.put(cake);
        }
    
        public String take() throws InterruptedException {
            String cake = super.take();
            System.out.println(Thread.currentThread().getName()+" takes "+cake);
            return cake;
        }
    }

     七,java.util.concurrent.Exchanger类

    该类用来交换两个线程的数据,只有当两个线程都准备好了,才会进行交换。不然其中一个线程会一直等待另一个线程

    public class ExchangerTest  {
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final Exchanger exchanger = new Exchanger();
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        String data1 = "aaa";
                        System.out.println("线程"+Thread.currentThread().getName()+
                        "正在准备把 "+data1+"换出去");
                        Thread.sleep(new Random().nextInt(3000));
                        String data2 = (String) exchanger.exchange(data1);
                        System.out.println("线程"+Thread.currentThread().getName()+
                        "换回的数据为"+data2);
                    }catch (InterruptedException e){
                    }
                }
            });
    
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        String data1 = "bbb";
                        System.out.println("线程"+Thread.currentThread().getName()+
                                "正在准备把 "+data1+"换出去");
    
                        Thread.sleep(new Random().nextInt(3000));
                        String data2 = (String) exchanger.exchange(data1);
                        System.out.println("线程"+Thread.currentThread().getName()+
                                "换回的数据为"+data2);
                    }catch (InterruptedException e){
                    }
                }
            });
    
        }
    }

     运行结果:

    线程pool-1-thread-1正在准备把 aaa换出去
    线程pool-1-thread-2正在准备把 bbb换出去
    线程pool-1-thread-2换回的数据为aaa
    线程pool-1-thread-1换回的数据为bbb

  • 相关阅读:
    React Native 项目运行在 Web 浏览器上面
    iOS:CYLTabBarController【低耦合集成TabBarController】
    iOS原生项目中集成React Native
    iOS根据Url 获取图片尺寸
    iOS关于html缓存
    swift约束框架SnapKit使用
    swift
    Swift关于Any,AnyObject,AnyClass的区别与联系
    swift三方库
    React Native
  • 原文地址:https://www.cnblogs.com/inspred/p/9393903.html
Copyright © 2011-2022 走看看