一.Callable的Future模式
线程实现方式:
1.继承Thread类
2.实现Runnable接口
3.线程池
4.Callable
无论使用继承Thread类还是实现Runnable接口,还是使用线程池都没有办法解决2个问题
1.线程执行没有返回值结果
2.线程执行没有办法抛出异常,只能自己通过try-catch解决
Callable和Runnable类似,在JUC包下,主要区别在于Callable中的call方法可以带返回值并且可以抛出异常
如果需要执行Callable,需要Future实现类的支持,能够接受返回值结果,FutureTask是Future实现类
调用Callable的第一种实现方案:
public class MyCallable implements Callable<String> { @Override public String call() throws Exception { System.out.println("Callable接口中重写的Call方法,可以有返回值并且抛出异常"); return "callable"; } public static void main(String[] args) throws ExecutionException, InterruptedException { MyCallable myCallable=new MyCallable(); //利用FutureTask执行Callable并且接受结果 FutureTask<String> stringFutureTask = new FutureTask<>(myCallable); //利用线程执行Task任务 new Thread(stringFutureTask).start(); //接受结果FutureTask.get会发生阻塞情况 System.out.println(stringFutureTask.get()); System.out.println("MyCallable执行完毕,返回值结果正确接收~"); } }
调用Callable的第二种实现方案:
public static void main(String[] args) throws ExecutionException, InterruptedException { MyCallable myCallable=new MyCallable(); //创建一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(3); //创建线程执行任务,接受任务结果 Future<String> future = executorService.submit(myCallable); //接受返回值 System.out.println(future.get(2000,TimeUnit.MILLISECONDS)); System.out.println("方式二,线程池:MyCallable执行完毕,返回值结果正确接收~"); //停止线程池 executorService.shutdown(); }
Future.get()方法获取任务执行结果,该方法如果没有返回时,暂时处于阻塞状态
Future.get(Long timeOut,TimeUnit timeUnit)可以设置超时时间
Future.boolean isDone()如果线程结束,无论是正常结束还是任务终止都会返回true
Future.boolean isCanceller()如果任务完成前被取消则返回true
Future.boolean cancel(boolean flag),方法参数如果传入为true代表中断任务,如果任务中断成功,则返回值为true,如果失败则为false
Future提供三种功能:1.中断任务cancel(true) 2.判断任务是否执行完成isDone() 3.获取任务执行后的结果get()
//中断任务 boolean cancel = future.cancel(true); if(cancel){ System.out.println("中断任务成功~"); }else{ //接受返回值 System.out.println(future.get(2000,TimeUnit.MILLISECONDS)); } 如果让手写Future模式应该怎么样定义? wait负责阻塞和notify负责唤起阻塞线程 public class MyFuture { //FLAG相当于数据标识,如果放入数据成功,则返回为true,否则返回为false private static boolean FLAG=false; private String data; public synchronized void setData(String data) throws InterruptedException { Thread.sleep(2000); //赋值操作 this.data = data; FLAG=true; //唤起 notify(); } public synchronized String getData() { //如果获取数据失败 if(!FLAG){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return data; } public static void main(String[] args) { MyFuture future=new MyFuture(); new Thread(()->{ try { future.setData("张三"); System.out.println(future.getData()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
二.并发队列Queue,队列其实就是一个容器
1.同步容器
Vector容器,HashTable容器,都是线程安全
如果同步容器使用foreach迭代过程中修改了元素的值,则会出现ConcurrentModificationException异常
可以使用iterator迭代器解决,但是在多线程并行情况下,修改容器中数据,会发生阻塞或者报NoSech异常
1.并发容器,队列
无界限:代表队列当中可以存放N个数据,没有长度限制
有界限:队列当中规定只能存放多少个数据,超过则阻塞
2.ConcurrentLinkedQueue,无界限的队列,可以采用add()方法(底层调用offer())或者offer方法将数据存放到队列当中,通过peek和poll方法获取队列头数据
peek()方法获取数据,但是该数据没有出列
poll()方法获取数据,完成后该数据出列
peek和poll当队列当中没有数据时,获取的数据为null,不会产生阻塞
public static void main(String[] args) { //准备队列 ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); //存放数据 queue.offer("张三"); queue.offer("李四"); queue.offer("王五"); //获取队列中数据个数 System.out.println("队列中当前有:"+queue.size()+"个数据~"); //获取队列中头数据 poll()方法相当于消费了队列中的数据,队列数据会随之删除 System.out.println("获取队列中的数据:"+queue.poll()); System.out.println("队列中当前有:"+queue.size()+"个数据~"); //获取队列中数据,但是不会删除 System.out.println("获取队列中的数据:"+queue.peek()); System.out.println("获取队列中的数据:"+queue.peek()); System.out.println("队列中当前有:"+queue.size()+"个数据~"); }
3.有边界的阻塞队列BlockingQueue
put方法和take会发生阻塞,add以及offer还有poll和peek不会发生阻塞
3.1 ArrayBlockingQueue:当队列没有数据时,获取时为null,当队列满时,会报异常或者入队失败
public static void main(String[] args) throws InterruptedException { //ArrayBlockingQueue底层数组实现 ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3); arrays.add("张三"); arrays.add("李四"); arrays.add("王五"); System.out.println(arrays.poll()); arrays.offer("赵六",1000, TimeUnit.MILLISECONDS); System.out.println(arrays.poll()); System.out.println(arrays.poll()); System.out.println(arrays.poll()); }
4.LinkedBlockingQueue 初始可以指定队列大小,如果不指定则按照Integer.MaxValue值进行设定
public static void main(String[] args) throws InterruptedException { //ArrayBlockingQueue底层数组实现 LinkedBlockingQueue <String> arrays = new LinkedBlockingQueue<String>(100); new Thread(()->{ for (int i = 0; i < 100; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { arrays.put("item"+i); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ for (int i = 0; i < 100; i++) { try { System.out.println(arrays.take()+i); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }