zoukankan      html  css  js  c++  java
  • BlockingQueue

    BlockingQueue

    一、阻塞队列基本方法介绍

    谈到线程池,不得不谈到生产者-消费者模式,谈到生产者-消费者,就不得不谈到对应的数据结构,谈到对应的数据结构不得不言BlockingQueue。

    顾名思义,BlockingQueue翻译为阻塞队列。队列无非两种操作:入队和出队。而针对于入队出队的边界值的不同,分为几个方法:

     

    抛出异常

    特殊值

    阻塞

    超时

    插入

    add(e)

    offer(e)

    put(e)

    offer(e, time, unit)

    移除

    remove()

    poll()

    take()

    poll(time, unit)

    检查

    element()

    peek()

    不可用

    不可用

    测试代码:

    public class QueueTest {

        public static void main(String args[]) {

            final BlockingQueue queue = new ArrayBlockingQueue(5);

            init(queue);

            System.out.println("queue.size=" + queue.size() + ",    top element:" + queue.element());

            // queue.add("f");  //1. add方法:队满加入抛异常

            /*boolean bool = queue.offer("f");  //2. offer方法,队满加入会返回:false

            System.out.println("queue.size=" + queue.size() + ",   入队结果:" + bool);*/

     /*       try {

                queue.put("f");  //3.put方法,队满put会阻塞,下边的systemout方法不会执行,直至有消费者take出去

            } catch (InterruptedException e) {

                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

            }

            System.out.println("queue.size=" + queue.size() + ",   put结束:");*/

            Thread thread1 = new Thread(new Runnable() {

                public void run() {

                    boolean bool = false;

                    try {

                        bool = queue.offer("f", 5, TimeUnit.SECONDS); //5.offer带时间参数:当队满时,如果等待一定时间内还是满的就返回false,如果在这个期间队有空间了就可以放入一个元素,返回

                        System.out.println("queue.size=" + queue.size() + ",   入队结果:" + bool);

                    } catch (InterruptedException e) {

                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                    }

                }

            });

            thread1.start();

            //我们开一个消费者线程做出队操作,以便上述的offer可以正常加入

            Thread thread = new Thread(new Runnable() {

                public void run() {

                    while (queue.size() > 0) {

                        try {

                            String str = (String) queue.take();

                            System.out.println("queue.size=" + queue.size() + ",   出队结果:" + str);

                            Thread.sleep(500);

                        } catch (InterruptedException e) {

                            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                        }

                    }

                }

            });

            thread.start();

        }

        private static void init(BlockingQueue queue) {

            queue.add("a");

            queue.add("b");

            queue.add("c");

            queue.add("d");

            queue.add("e");

        }

    }

    add:入队,如果队列满会抛出异常。

    Exception in thread "main" java.lang.IllegalStateException: Queue full

    at java.util.AbstractQueue.add(AbstractQueue.java:71)

    at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)

    源码:

        public boolean add(E e) {

            if (offer(e)) 

                return true; //如果加入成功,返回true

            else

                throw new IllegalStateException("Queue full"); //如果添加失败,抛异常

        }

    Offer:入队,如果队满会返回false

    boolean bool = queue.offer("f");  //offer方法会返回:false

    源码:

        public boolean offer(E e) {

            if (e == null) throw new NullPointerException();

            final ReentrantLock lock = this.lock;

            lock.lock();

            try {

                if (count == items.length)

                    return false; //返回失败

                else {

                    insert(e);

                    return true; //返回成功

                }

            } finally {

                lock.unlock();

            }

        }

    Put:入队,如果队满会阻塞

            try {

                queue.put("f");

            } catch (InterruptedException e) {

                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

            }

             System.out.println("queue.size=" + queue.size() + ",   put结束:" );

    源码:

        public void put(E e) throws InterruptedException {

            if (e == null) throw new NullPointerException();

            final E[] items = this.items;

            final ReentrantLock lock = this.lock;

            lock.lockInterruptibly();

            try {

                try {

                    while (count == items.length)

                        notFull.await(); //阻塞在这儿

                } catch (InterruptedException ie) {

                    notFull.signal(); // propagate to non-interrupted thread

                    throw ie;

                }

                insert(e);

            } finally {

                lock.unlock();

            }

        }

    Offer:带时间的,会有一个缓冲时间,若超过此期间插入失败则失败。

    queue.size=5,    top element:a

    queue.size=4,   出队结果:a

    queue.size=5,   入队结果:true

    queue.size=4,   出队结果:b

    queue.size=3,   出队结果:c

    queue.size=2,   出队结果:d

    queue.size=1,   出队结果:e

    queue.size=0,   出队结果:f

    源码:

      public boolean offer(E e, long timeout, TimeUnit unit)

            throws InterruptedException {

            if (e == null) throw new NullPointerException();

    long nanos = unit.toNanos(timeout);

            final ReentrantLock lock = this.lock;

            lock.lockInterruptibly();

            try {

                for (;;) { //轮询

                    if (count != items.length) {

    //如果有其他线程消费元素,队列不满了,才可以插入元素

                        insert(e);

                        return true;

                    }

                    if (nanos <= 0)//如果到了约定时间没有插入成功,返回false

                        return false;

                    try {

                        nanos = notFull.awaitNanos(nanos); //轮询减时间

                    } catch (InterruptedException ie) {

                        notFull.signal(); // propagate to non-interrupted thread

                        throw ie;

                    }

                }

            } finally {

                lock.unlock();

            }

        }

    出队的几种形式与入队对应,实例略。

    从上述源码也可以看出,BlockingQueque还有几个特点:

    1. 不接受null值。

    2. 线程安全,方法都用了Lock

    3. 最最精华的部分:生产者-消费者模型。

    二、生产者-消费者模型

    实例:

    public class BlockingQueueTest {

        private static AtomicInteger count = new AtomicInteger(0);

        private static AtomicInteger countCreate = new AtomicInteger(0);

    public static void main(String args[]) {

        //定义一个阻塞队列,存放文件信息

            BlockingQueue fileQueue = new ArrayBlockingQueue(5);

            String path = "F:\Song";

            File root = new File(path);

            //生产者线程去遍历文件,放入队列

            FileCrawler fileCrawler = new FileCrawler(fileQueue, root);

            //消费者线程去遍历队列,取出文件

            Indexer indexer = new Indexer(fileQueue);

            //开启几个生产者线程开始遍历文件

            for (File file : root.listFiles()) {

                new Thread(new FileCrawler(fileQueue, file)).start();

            }

             //开启7个消费者者线程开始取出文件

            for (int i = 0; i < 7; i++) {

                new Thread(new Indexer(fileQueue)).start();

            }

            try {

                Thread.sleep(2000);

            } catch (InterruptedException e) {

                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

            }

            System.out.println("生产者生产:" + countCreate.get());

            System.out.println("消费者取到:" + count.get());

        }

        static class FileCrawler implements Runnable {

            private final BlockingQueue fileQueue;

            private final File root;

            FileCrawler(BlockingQueue fileQueue, File root) {

                this.fileQueue = fileQueue;

                this.root = root;

            }

            public void run() {

                try {

                    System.out.println("生产者开始生产:" + fileQueue.size());

                    crawl(root);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

            private void crawl(File root) throws InterruptedException {

                File[] files = root.listFiles();

                if (files != null) {

                    for (File file : files) {

                        if (file.isDirectory()) {

                            crawl(file);

                        } else {

                            fileQueue.put(file); //put

                            countCreate.incrementAndGet();

                        }

                    }

                }

            }

        }

        static class Indexer implements Runnable {

            private final BlockingQueue fileQueue;

            Indexer(BlockingQueue fileQueue) {

                this.fileQueue = fileQueue;

            }

            public void run() {

                while (true) {

                    try {

                        System.out.println("消费者开始消费:" + fileQueue.size());

                        File file = (File) fileQueue.take(); //take

                        count.incrementAndGet();

                        System.out.println(file.getName());

                    } catch (InterruptedException e) {

                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                    }

                }

            }

        }

    }

    三、阻塞队列的实现类

    基本的任务排队方式有三种:

    有界:ArrayBlockingQueue: 一个由数组支持的有界阻塞队列。

    无界:LinkedBlockingQueue: 一个基于已链接节点的、范围任意的blocking queue。

    同步移交:SynchronousQueue: 同步队列,put和take串行执行。生产者对其的插入操作必须等待消费者的移除操作,反之亦然。同步队列类似于信道,它非常适合传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

    synchronousQueue的思想:

    参考:http://ifeve.com/java-synchronousqueue/

    实例:

    public class SynchroNousQueueTest {

        public static void main(String args[]) {

    //        final SynchronousQueue synchronousQueue = new SynchronousQueue();

            SynchroNousQueueTest synchroNousQueueTest = new SynchroNousQueueTest();

            final MyShnchronouseQueue<String> synchronousQueue = synchroNousQueueTest.new MyShnchronouseQueue<String>();

            //1。开启一个生产者线程

            Thread threadPut = new Thread(new Runnable() {

                public void run() {

                    try {

                        for (int i = 0; i < 10; i++) {

                            synchronousQueue.put(i + "");

                            System.out.println("synchronousQueue,insert element:" + i);

                        }

                    } catch (InterruptedException e) {

                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                    }

                }

            });

            //2。开启一个消费者线程

            Thread threadTask = new Thread(new Runnable() {

                public void run() {

                    try {

                        for (int i = 0; i < 10; i++) {

                            synchronousQueue.take();

                            System.out.println("synchronousQueue,output element:" + i);

                        }

                    } catch (InterruptedException e) {

                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                    }

                }

            });

            threadPut.start();

            threadTask.start();

        }

        class MyShnchronouseQueue<E> {

            Lock lock = new ReentrantLock();

            Condition isFull = lock.newCondition();

            Condition isEmpty = lock.newCondition();

            boolean flag = false;   //同步开关

            E item = null;     //只有一个元素

            public void put(E e) throws InterruptedException {

                lock.lock();

                try {

                    while (flag) {    // 当开关为true时,put阻塞,一直await

                        isEmpty.await();

                    }

                    //当开关为false之后,改为true,item设值,唤醒消费者消费

                    flag = true;

                    item = e;

                    isFull.signalAll();

                } catch (Exception e1) {

                    e1.printStackTrace();

                } finally {

                    lock.unlock();

                }

            }

            public synchronized E take() throws InterruptedException {

                lock.lock();

                try {

                    while (!flag) {      // 当开关为false时,take阻塞,一直await

                        isFull.await();

                    }

                    //当开关为true之后,改为false,获取item的值,唤醒生产者生产

                    flag = false;

                    E e = item;

                    item = null;

                    isEmpty.signalAll();

                    return e;

                } catch (Exception e1) {

                    e1.printStackTrace();

                } finally {

                    lock.unlock();

                }

                return null;

            }

        }

    }

    结果:

    synchronousQueue,insert element:0

    synchronousQueue,output element:0

    synchronousQueue,output element:1

    synchronousQueue,insert element:1

    synchronousQueue,insert element:2

    synchronousQueue,output element:2

    synchronousQueue,insert element:3

    synchronousQueue,output element:3

    synchronousQueue,insert element:4

    synchronousQueue,output element:4

    synchronousQueue,insert element:5

    synchronousQueue,output element:5

    synchronousQueue,insert element:6

    synchronousQueue,output element:6

    synchronousQueue,insert element:7

    synchronousQueue,output element:7

    synchronousQueue,insert element:8

    synchronousQueue,output element:8

    synchronousQueue,insert element:9

    synchronousQueue,output element:9

    四、线程池的选择:

    根据这些队列的不同特性,我们的线程池也定义了不同的类别:

    单一线程池:可以看到corePoolSize=1

        public static ExecutorService newSingleThreadExecutor() {

            return new FinalizableDelegatedExecutorService

                (new ThreadPoolExecutor(1, 1,

                                        0L, TimeUnit.MILLISECONDS,

                                        new LinkedBlockingQueue<Runnable>()));

        }

    固定大小线程池:corePoolSize和maximumPoolSize固定,

        public static ExecutorService newFixedThreadPool(int nThreads) {

            return new ThreadPoolExecutor(nThreads, nThreads,

                                          0L, TimeUnit.MILLISECONDS,

                                          new LinkedBlockingQueue<Runnable>());

        }

    无界线程池:maximumPoolSize为Integer.MAX_VALUE

        public static ExecutorService newCachedThreadPool() {

            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                          60L, TimeUnit.SECONDS,

                                          new SynchronousQueue<Runnable>());

        }

    前两者默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理他们的速度,那么队列将无限制地增加。

    一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(这就需要一些饱和策略)在使用有界队列工作时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时可以减少上下文切换,但付出的代价是可能会限制吞吐量。

    对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界或者可以拒绝任务时,SynchronousQueue才有实际价值。

    对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择。它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用程序中,如果不进行限制,那么狠容易发生过载问题。

    从另一个维度来看:cpu密集型任务,由于cpu使用率一直很高,这时的线程不宜过多,建议配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务由于线程并不是一直在执行任务,IO比较频繁,所以可以配置较多的线程,如2*Ncpu。

  • 相关阅读:
    LeetCode剑指Offer03
    腾讯软件开发客户端开发实习生二面
    luogu P2801 教主的魔法 分块
    luogu P3396 哈希冲突 根号算法
    luogu P1972 [SDOI2009]HH的项链 树状数组
    BZOJ 2440: [中山市选2011]完全平方数 莫比乌斯函数 容斥原理 二分答案
    柳阴直,烟里丝丝弄碧
    卡通别名
    它们
    高中随笔
  • 原文地址:https://www.cnblogs.com/qi-dian-ao/p/8467963.html
Copyright © 2011-2022 走看看