阻塞队列
对于多线程问题,可能通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出它们。使用队列,可以安全地从一个线程向另一个线程传递数据。
当试图向队列 添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blocking queue)导致线程阻塞。在协调多个线程之间的合作时,阻塞队列是一个有用的工具。工作者线程可以周期性地将中间结果存储在阻塞队列中。其他的工作者线程移出中间结果并进一步加以修改。队列会自动的平衡负载。如果第一个线程集比第二个慢,第二个线程集在等待结果时会阻塞。如果第一个线程集运行得快,它将等待第二个队列集赶上来。
阻塞队列分为以下3类,这取决于当队列满或空时他们的响应方式。如果将队列当作线程管理工具来使用,将要用到put和take方法。当试图向满的队列中添加或从空的队列中移出元素时,add、remove和element操作抛出异常。在一个多线程程序中,队列会在任何时候空或满,因此,一定要使用offer、poll,poll方法作为替代。这些方法如果不能完成任务,只是给出一个错误提示而不会抛出异常。
注意:poll和peek方法返回空来指示失败。因此向这些队列中插入null值是非法的。
java.util.concurrent包提供了阻塞队列的几个变种。默认情况下,LinkedBlockingQueue的容量是没有上边界的,但是,也可以选择指定容量,并且有一个可选的参数来指定是否需要公平性。设置了公平参数,则那么等待了最长时间的线程会优先得到处理。
PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出。该队列是没有容量上限,但是,如果队列是空的,取元素的操作会阻塞。
如下代码演示如何使用阻塞队列来控制一组线程:
1 package test.BlockingQueue; 2 3 import java.io.File; 4 import java.util.Scanner; 5 import java.util.concurrent.ArrayBlockingQueue; 6 import java.util.concurrent.BlockingQueue; 7 8 /** 9 * 演示如何使用阻塞队列来控制一组线程 10 * 程序在一个目录及它的所有子目录下搜索所有文件,打印出指定关键字的行。 11 */ 12 public class BlockQueueTest { 13 public static void main(String[] args) { 14 Scanner in=new Scanner(System.in); 15 System.out.print("输入搜索路径:"); 16 String directory=in.nextLine(); 17 System.out.print("输入关键字:"); 18 String keyword=in.nextLine(); 19 //阻塞队列大小(队列中最多放10个) 20 final int FILE_QUEUE_SIZE=10; 21 //开启线程数 22 final int SEARCH_THREADS=100; 23 BlockingQueue<File> queue=new ArrayBlockingQueue<>(FILE_QUEUE_SIZE); 24 //文件枚举任务类 25 FileEnumerationTask enumerator=new FileEnumerationTask(queue,new File(directory)); 26 //开启(文件)线程将文件放入队列 27 new Thread(enumerator).start(); 28 //开启(搜索)线程 29 for (int i = 1; i <=SEARCH_THREADS ; i++) { 30 new Thread(new SearchTask(queue,keyword)).start(); 31 } 32 } 33 }
1 package test.BlockingQueue; 2 3 import java.io.File; 4 import java.util.concurrent.BlockingQueue; 5 6 /** 7 * 此任务枚举目录及其子目录中的所有文件 8 */ 9 public class FileEnumerationTask implements Runnable { 10 public static File DUMMY = new File(""); 11 private BlockingQueue<File> queue; 12 private File startingDirectory; 13 14 /** 15 * @param queue 添加枚举文件的阻塞队列 16 * @param startingDirectory 开始枚举的目录 17 */ 18 public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) { 19 this.queue = queue; 20 this.startingDirectory = startingDirectory; 21 } 22 23 @Override 24 public void run() { 25 try { 26 //将搜索路径下的文件存入队列,队列满10个,则将阻塞,只到搜索线程取出一个文件元素, 27 //直到这个路径下的所有文件都搜索完 28 enumerate(startingDirectory); 29 //将队列最后一个设置为空文件 30 //搜索线程的每一个线程一次都会从队列中取出一个文件,当所有文件都取完后,取到这个空文件,代表搜索结束 31 queue.put(DUMMY); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 35 } 36 } 37 38 /** 39 * 递归枚举给定目录及其子目录中的所有文件 40 * 将文件存入队列中 41 * @param directory 42 */ 43 private void enumerate(File directory) throws InterruptedException { 44 File[] files = directory.listFiles(); 45 if (files==null){ 46 return; 47 } 48 for (File file : files 49 ) { 50 //如果是文件则存入队列,是目录则继续读取目录下的文件 51 if (file.isDirectory()) { 52 enumerate(file); 53 } else { 54 queue.put(file); 55 } 56 } 57 } 58 }
1 package test.BlockingQueue; 2 3 import java.io.*; 4 import java.util.concurrent.BlockingQueue; 5 6 /** 7 * 该任务搜索一个给定关键字的文件 8 */ 9 public class SearchTask implements Runnable { 10 private BlockingQueue<File> queue; 11 private String keyword; 12 13 /** 14 * @param queue 获取文件的队列 15 * @param keyword 要查找的关键字 16 */ 17 public SearchTask(BlockingQueue<File> queue, String keyword) { 18 this.queue = queue; 19 this.keyword = keyword; 20 } 21 22 @Override 23 public void run() { 24 try { 25 boolean done = false; 26 while (!done) { 27 //从队列中取出一个元素,如果队列空,则阻塞 28 File file = queue.take(); 29 //如果文件为空文件,将空文件重新放入队列并停止循环,线程执行完成 30 //将空文件放入队列的意义在于,其它搜索线程可以拿到空文件后从而结束线程,保证队列中永远都存在一个元素 31 //如果队列为空,为导致其它线程被阻塞无法结束 32 if (file == FileEnumerationTask.DUMMY) { 33 //将空文件重新放入队列 34 queue.put(file); 35 36 done = true; 37 } else { 38 search(file); 39 } 40 } 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } catch (FileNotFoundException e) { 44 e.printStackTrace(); 45 } catch (IOException e) { 46 e.printStackTrace(); 47 } 48 } 49 50 /** 51 * 搜索给定关键字的文件并打印所有匹配的行 52 * 53 * @param file 要搜索的文件 54 */ 55 private void search(File file) throws IOException { 56 //记录文件行数 57 int lineNumber = 0; 58 BufferedReader br=new BufferedReader(new InputStreamReader(new FileInputStream(file.toString()),"gbk")); 59 while (br.readLine()!=null) { 60 lineNumber++; 61 String s=br.readLine(); 62 if (s!=null&&s.contains(keyword)){ 63 System.out.println(file.toString()+"===="+lineNumber+"===="+s); 64 } 65 } 66 br.close(); 67 68 } 69 }
生产者线程枚举了在所有子目录下的所有文件并把它们放入阻塞队列中。这个操作很快,如果没有上限的话,很快就包含了所有找到的文件。
我们同时启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印所有包含该关键字的行,然后取出下一个文件 。我们使用一个小技巧在工作结束 后终止这个应用程序。为了发出完成信号,枚举线程放置了一个虚拟对象到队列中(这就像在行李输送带上放一个写着“最后一包”的虚拟包)。当搜索线程取到这个虚拟对象时,将其放回并终止。
注意:不需要显式的线程同步。这个应用程序中,我们使用队列数据结构作为一种同步机制。
java.util.concurrent.ArrayBlockingQueue<E>
- ArrayBlockingQueue(int capacity)
- ArrayBlockingQueue(int capacity,boolean fair) 构造一个带有指定容量和公平性设置的阻塞队列。该队列用循环数组实现。
java.util.concurrent.LinkedBlockingQueue<E>
java.util.concurrent.LinkedBlockingDeque<E>
- LinkedBlockingQueue()
- LinkedBlockingDeque() 构造一个无上限的阻塞队列可双向队列,用链表实现
- LinkedBlockingQueue(int capacity)
- LinkedBlockingDeque(int capacity) 根据指定容量构建一个有限的阻塞队列或双向队列,用链表实现。
java.util.concurrent.DelayQueue<E extends delayed>
- DelayQueue()
构造一个包含Delayed元素的无界的阻塞时间有限的阻塞队列。只有那些延迟已经超过时间的元素可以从队列中移出。
-
java.util.concurrent.PriorityBlockingQueue<E>
- PriorityBlockingQueue();
- PriorityBlockingQueue(int initialCapacity)
- PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator)
构造一个无边界阻塞优先队列,用堆时间。
参数:initialCapacity 优先队列的初始容量 。默认值是11。
comparator 用来对元素进行比较的比较器,如果没有指定,则元素必须实现Comparable接口
-
java.util.concurrent.BlockingDeque<E>
- void put(E element) 添加元素,在必要时阻塞。
- E take() 移除并返回头元素,必要时阻塞。
- boolean offer(E element,long time,TimeUnit unit) 添加给定的元素,成功返回true.如果必要时阻塞,直至元素已经被添加或超时。
- E poll(long time,TimeUnit unit) 移除并返回头元素,必要时阻塞,直至元素可用或超时用完。失败时返回null。
-
java.util.concurrent.BlockingDeque<E>
- void putFirst(E emelent)
- void putLast(E emelent) 添加元素,必要时阻塞。
- E takeFirst()
- E takeLast() 移除并返回头元素或尾元素,必要时阻塞。
- boolean offerFirst(E element,long time,TimeUnit unit)
- boolean offerLast(E element,long time,TimeUnit unit) 添加给定的元素,成功时返回true,必要时阻塞直至元素被添加或超时。
- E pollFirst(long time,TimeUnit unit)
- E pollLast(long time,TimeUnit unit) 移动并返回头元素或尾元素,必要时阻塞,直至元素可用或超时。失败时返回null。
-
java.unit.concurrent.Transfer Queue<E>
- void transfer(E element)
- boolean tryTransfer(E emelent,long time, TimeUnit unit)
传输一个值,或者尝试在给定的超时时间内传输这个值,这个调用将阻塞,直到另一个线程将元素删除。第二个方法会在调用成功时返回true。