Callable的Future模式
线程实现方式
- 1.继承Thread类
- 2.实现Runnable接口
- 3.线程池
- 4.Callable
无论使用继承Thread类还是实现Runnable接口,还是使用线程池都没有办法解决2个问题
- 1.线程执行没有返回值结果
- 2.线程执行没有办法抛出异常,只能自己通过try-catch解决
什么是Callable?
在java中,创建线程一般有两种方式,一种是继承Thread类,一种是实现Runnable接口。
然而,这两种方式的缺点是在线程任务执行结束后,无法获取执行结果。
我们一般只能采用共享变量或共享存储以及线程通信的方式实现获得任务结果的目的;
不过,在java中,也提供了使用Callable和Future来实现获取任务结果的操作。
Callable用来执行任务,产生结果,而Future用来获得结果;
1 @FunctionalInterface 2 public interface Callable<V> { 3 /** 4 * Computes a result, or throws an exception if unable to do so. 5 * 6 * @return computed result 7 * @throws Exception if unable to compute a result 8 */ 9 V call() throws Exception; 10 }
Callable和Runnable的区别
- 1、Callable能接受一个泛型,然后在call方法中返回一个这个类型的值,而Runnable的run方法没有返回值;
- 2、Callable的call方法可以抛出异常,而Runnable的run方法不会抛出异常;
什么是Future模式
调用Callable的第一种实现方案
1 package com.wish.demo05; 2 3 import java.util.concurrent.*; 4 5 public class MyCallable implements Callable<String> { 6 7 @Override 8 public String call() throws Exception { 9 System.out.println("Callable接口中重写的Call方法,可以有返回值并且抛出异常"); 10 return "Callable"; 11 } 12 13 public static void main (String[]args) throws ExecutionException, InterruptedException { 14 MyCallable myCallable = new MyCallable(); 15 //利用FutureTask执行Callable并且接受结果 16 FutureTask<String> stringFutureTask = new FutureTask<>(myCallable); 17 //利用线程执行Task任务 18 new Thread(stringFutureTask).start(); 19 //接受结果FutureTask.get会发生阻塞情况 20 System.out.println(stringFutureTask.get()); 21 22 System.out.println("MyCallable执行完毕,返回值结果正确接收~"); 23 } 24 25 }
结果
调用Callable的第二种实现方案
1 package com.wish.demo05; 2 3 import java.util.concurrent.*; 4 5 public class MyCallable implements Callable<String> { 6 7 @Override 8 public String call() throws Exception { 9 System.out.println("Callable接口中重写的Call方法,可以有返回值并且抛出异常"); 10 return "Callable"; 11 } 12 13 14 public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { 15 MyCallable myCallable=new MyCallable(); 16 //创建一个线程池 17 ExecutorService executorService = Executors.newFixedThreadPool(3); 18 //创建线程执行任务,接受任务结果 19 Future<String> future = executorService.submit(myCallable); 20 //接受返回值 21 System.out.println(future.get(2000,TimeUnit.MILLISECONDS)); 22 System.out.println("方式二,线程池:MyCallable执行完毕,返回值结果正确接收~"); 23 //停止线程池 24 executorService.shutdown(); 25 } 26 }
结果
Future的常用方法
- Future.get()方法获取任务执行结果,该方法如果没有返回时,暂时处于阻塞状态
- Future.get(Long timeOut,TimeUnit timeUnit)可以设置超时时间
- Future.boolean isDone()如果线程结束,无论是正常结束还是任务终止都会返回true
- Future.boolean isCanceller()如果任务完成前被取消则返回true
- Future.boolean cancel(boolean flag),方法参数如果传入为true代表中断任务,如果任务中断成功,则返回值为true,如果失败则为false
Future提供三种功能
- 1.中断任务cancel(true)
- 2.判断任务是否执行完成isDone()
- 3.获取任务执行后的结果get()
中断任务cancel(true)
1 package com.wish.demo05; 2 3 import java.util.concurrent.*; 4 5 public class MyCallable implements Callable<String> { 6 7 @Override 8 public String call() throws Exception { 9 System.out.println("Callable接口中重写的Call方法,可以有返回值并且抛出异常"); 10 return "Callable"; 11 } 12 13 14 public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { 15 MyCallable myCallable=new MyCallable(); 16 //创建一个线程池 17 ExecutorService executorService = Executors.newFixedThreadPool(3); 18 //创建线程执行任务,接受任务结果 19 Future<String> future = executorService.submit(myCallable); 20 //中断任务 21 boolean cancel = future.cancel(true); 22 if(cancel){ 23 System.out.println("中断任务成功~"); 24 }else{ 25 //接受返回值 26 System.out.println(future.get(2000,TimeUnit.MILLISECONDS)); 27 28 } 29 //停止线程池 30 executorService.shutdown(); 31 } 32 }
结果
手写Future模式的思路
Future模式的本质就是等待和唤起,在有值的情况下就唤起,没有值的情况就等待
所以使用wait负责阻塞和notify负责唤起阻塞线程
1 package com.wish.demo05; 2 3 public class MyFuture { 4 //FLAG相当于数据标识,如果放入数据成功,则返回为true,否则返回为false 5 private static boolean FLAG=false; 6 private String data; 7 8 public synchronized void setData(String data) throws InterruptedException { 9 Thread.sleep(2000); 10 //赋值操作 11 this.data = data; 12 FLAG=true; 13 //唤起 14 notify(); 15 } 16 17 public synchronized String getData() { 18 //如果获取数据失败 19 if(!FLAG){ 20 try { 21 wait(); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } 25 } 26 return data; 27 } 28 29 public static void main(String[] args) { 30 MyFuture future=new MyFuture(); 31 new Thread(()->{ 32 try { 33 future.setData("张三"); 34 System.out.println(future.getData()); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } 38 }).start(); 39 } 40 }
并发队列Queue
同步容器
Vector容器,HashTable容器,都是线程安全
如果同步容器使用foreach迭代过程中修改了元素的值,则会出现ConcurrentModificationException异常
可以使用iterator迭代器解决,但是在多线程并行情况下,修改容器中数据,会发生阻塞或者报NoSech异常
并发容器,队列
无界限:代表队列当中可以存放N个数据,没有长度限制
有界限:队列当中规定只能存放多少个数据,超过则阻塞
ConcurrentLinkedQueue
ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,
通常ConcurrentLinkedQueue性能好于BlockingQueue.它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。
头是最先加入的,尾是最近加入的,该队列不允许null元素。
ConcurrentLinkedQueue重要方法
- add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别)
- poll() 和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。
ConcurrentLinkedQueue的实现代码
1 package com.wish.demo05; 2 3 import java.util.concurrent.ConcurrentLinkedQueue; 4 5 public class Tests { 6 public static void main(String[] args) { 7 //准备队列 8 ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); 9 //存放数据 10 queue.offer("张三"); 11 queue.offer("李四"); 12 queue.offer("王五"); 13 14 //获取队列中数据个数 15 System.out.println("队列中当前有:"+queue.size()+"个数据~"); 16 //获取队列中头数据 poll()方法相当于消费了队列中的数据,队列数据会随之删除 17 System.out.println("获取队列中的数据:"+queue.poll()); 18 System.out.println("队列中当前有:"+queue.size()+"个数据~"); 19 //获取队列中数据,但是不会删除 20 System.out.println("获取队列中的数据:"+queue.peek()); 21 System.out.println("获取队列中的数据:"+queue.peek()); 22 System.out.println("队列中当前有:"+queue.size()+"个数据~"); 23 } 24 }
结果
BlockingQueue
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
put方法和take会发生阻塞,add以及offer还有poll和peek不会发生阻塞
ArrayBlockingQueue
ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。
ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。
1 package com.wish.demo05; 2 3 import java.util.concurrent.*; 4 5 public class Tests { 6 public static void main(String[] args) throws InterruptedException { 7 //ArrayBlockingQueue底层数组实现 8 ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3); 9 arrays.add("张三"); 10 arrays.add("李四"); 11 arrays.add("王五"); 12 arrays.offer("赵六",1000, TimeUnit.MILLISECONDS); 13 System.out.println(arrays.poll()); 14 System.out.println(arrays.poll()); 15 System.out.println(arrays.poll()); 16 System.out.println(arrays.poll()); 17 } 18 }
结果:
LinkedBlockingQueue
LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。
说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。
1 package com.wish.demo05; 2 3 import java.util.concurrent.*; 4 5 public class Tests { 6 public static void main(String[] args) throws InterruptedException { 7 //ArrayBlockingQueue底层数组实现 8 LinkedBlockingQueue <String> arrays = new LinkedBlockingQueue<String>(100); 9 10 new Thread(()->{ 11 for (int i = 0; i < 10; i++) { 12 try { 13 Thread.sleep(1000); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 try { 18 arrays.put("item"+i); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } 22 } 23 }).start(); 24 25 new Thread(()->{ 26 for (int i = 0; i < 10; i++) { 27 try { 28 System.out.println(arrays.take()+"-"+i); 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 } 33 }).start(); 34 } 35 }
结果:
PriorityBlockingQueue
PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。
需要注意,PriorityBlockingQueue中允许插入null对象。
所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。
另外,我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级顺序进行迭代。
SynchronousQueue
SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。