zoukankan      html  css  js  c++  java
  • 并发队列

    一.Callable的Future模式
       线程实现方式:
        1.继承Thread类
        2.实现Runnable接口
        3.线程池
        4.Callable
       
       无论使用继承Thread类还是实现Runnable接口,还是使用线程池都没有办法解决2个问题
        1.线程执行没有返回值结果
        2.线程执行没有办法抛出异常,只能自己通过try-catch解决
       
       Callable和Runnable类似,在JUC包下,主要区别在于Callable中的call方法可以带返回值并且可以抛出异常
       如果需要执行Callable,需要Future实现类的支持,能够接受返回值结果,FutureTask是Future实现类
       
       调用Callable的第一种实现方案:
        public class MyCallable implements Callable<String> {
         @Override
         public String call() throws Exception {
          System.out.println("Callable接口中重写的Call方法,可以有返回值并且抛出异常");
          return "callable";
         }
         public static void main(String[] args) throws ExecutionException, InterruptedException {
          MyCallable myCallable=new MyCallable();
          //利用FutureTask执行Callable并且接受结果
          FutureTask<String> stringFutureTask = new FutureTask<>(myCallable);
          //利用线程执行Task任务
          new Thread(stringFutureTask).start();
          //接受结果FutureTask.get会发生阻塞情况
          System.out.println(stringFutureTask.get());
          System.out.println("MyCallable执行完毕,返回值结果正确接收~");
         }
        }
       
       
       调用Callable的第二种实现方案:
        public static void main(String[] args) throws ExecutionException, InterruptedException {
         MyCallable myCallable=new MyCallable();
         //创建一个线程池
         ExecutorService executorService = Executors.newFixedThreadPool(3);
         //创建线程执行任务,接受任务结果
         Future<String> future = executorService.submit(myCallable);
         //接受返回值
         System.out.println(future.get(2000,TimeUnit.MILLISECONDS));
         System.out.println("方式二,线程池:MyCallable执行完毕,返回值结果正确接收~");
         //停止线程池
         executorService.shutdown();
        }
      
       Future.get()方法获取任务执行结果,该方法如果没有返回时,暂时处于阻塞状态
       Future.get(Long timeOut,TimeUnit timeUnit)可以设置超时时间
       Future.boolean isDone()如果线程结束,无论是正常结束还是任务终止都会返回true
       Future.boolean isCanceller()如果任务完成前被取消则返回true
       Future.boolean cancel(boolean flag),方法参数如果传入为true代表中断任务,如果任务中断成功,则返回值为true,如果失败则为false
       
       Future提供三种功能:1.中断任务cancel(true)        2.判断任务是否执行完成isDone()        3.获取任务执行后的结果get()
        //中断任务
        boolean cancel = future.cancel(true);
        if(cancel){
         System.out.println("中断任务成功~");
        }else{
         //接受返回值
         System.out.println(future.get(2000,TimeUnit.MILLISECONDS));
        }
       如果让手写Future模式应该怎么样定义?
        wait负责阻塞和notify负责唤起阻塞线程
        
        public class MyFuture {
         //FLAG相当于数据标识,如果放入数据成功,则返回为true,否则返回为false
         private static boolean FLAG=false;
         private String data;
         public synchronized void setData(String data) throws InterruptedException {
          Thread.sleep(2000);
          //赋值操作
          this.data = data;
          FLAG=true;
          //唤起
          notify();
         }
         public synchronized String getData() {
          //如果获取数据失败
          if(!FLAG){
           try {
            wait();
           } catch (InterruptedException e) {
            e.printStackTrace();
           }
          }
          return data;
         }
         public static void main(String[] args) {
          MyFuture future=new MyFuture();
          new Thread(()->{
           try {
            future.setData("张三");
            System.out.println(future.getData());
           } catch (InterruptedException e) {
            e.printStackTrace();
           }
          }).start();
         }
        }
       
       
       
       
      二.并发队列Queue,队列其实就是一个容器
       1.同步容器
        Vector容器,HashTable容器,都是线程安全
        如果同步容器使用foreach迭代过程中修改了元素的值,则会出现ConcurrentModificationException异常
        可以使用iterator迭代器解决,但是在多线程并行情况下,修改容器中数据,会发生阻塞或者报NoSech异常
       1.并发容器,队列
        无界限:代表队列当中可以存放N个数据,没有长度限制
        有界限:队列当中规定只能存放多少个数据,超过则阻塞 
       
       2.ConcurrentLinkedQueue,无界限的队列,可以采用add()方法(底层调用offer())或者offer方法将数据存放到队列当中,通过peek和poll方法获取队列头数据
        peek()方法获取数据,但是该数据没有出列
        poll()方法获取数据,完成后该数据出列
        
        peek和poll当队列当中没有数据时,获取的数据为null,不会产生阻塞
        
       
        public static void main(String[] args) {
         //准备队列
         ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
         //存放数据
         queue.offer("张三");
         queue.offer("李四");
         queue.offer("王五");
         //获取队列中数据个数
         System.out.println("队列中当前有:"+queue.size()+"个数据~");
         //获取队列中头数据  poll()方法相当于消费了队列中的数据,队列数据会随之删除
         System.out.println("获取队列中的数据:"+queue.poll());
         System.out.println("队列中当前有:"+queue.size()+"个数据~");
         //获取队列中数据,但是不会删除
         System.out.println("获取队列中的数据:"+queue.peek());
         System.out.println("获取队列中的数据:"+queue.peek());
         System.out.println("队列中当前有:"+queue.size()+"个数据~");
        }
       
       3.有边界的阻塞队列BlockingQueue
        put方法和take会发生阻塞,add以及offer还有poll和peek不会发生阻塞
       
       
        3.1 ArrayBlockingQueue:当队列没有数据时,获取时为null,当队列满时,会报异常或者入队失败
        
         public static void main(String[] args) throws InterruptedException {
          //ArrayBlockingQueue底层数组实现
          ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3);
          arrays.add("张三");
          arrays.add("李四");
          arrays.add("王五");
          System.out.println(arrays.poll());
          arrays.offer("赵六",1000, TimeUnit.MILLISECONDS);
          System.out.println(arrays.poll());
          System.out.println(arrays.poll());
          System.out.println(arrays.poll());
         }
       
       4.LinkedBlockingQueue 初始可以指定队列大小,如果不指定则按照Integer.MaxValue值进行设定
        public static void main(String[] args) throws InterruptedException {
         //ArrayBlockingQueue底层数组实现
         LinkedBlockingQueue <String> arrays = new LinkedBlockingQueue<String>(100);
         new Thread(()->{
          for (int i = 0; i < 100; i++) {
           try {
            Thread.sleep(1000);
           } catch (InterruptedException e) {
            e.printStackTrace();
           }
           try {
            arrays.put("item"+i);
           } catch (InterruptedException e) {
            e.printStackTrace();
           }
          }
         }).start();
         new Thread(()->{
          for (int i = 0; i < 100; i++) {
           try {
            System.out.println(arrays.take()+i);
           } catch (InterruptedException e) {
            e.printStackTrace();
           }
          }
         }).start();
        }
       
  • 相关阅读:
    intellij IDEA启动springboot项目报无效的源发行版错误解决方法
    JDBC调用oracle 存储过程
    Java自定义注解学习
    [Python3网络爬虫开发实战] 1.7.1-Charles的安装
    [Python3网络爬虫开发实战] 1.6.1-Flask的安装
    [Python3网络爬虫开发实战] 1.6.2-Tornado的安装
    [Python3网络爬虫开发实战] 1.5.4-RedisDump的安装
    [Python3网络爬虫开发实战] 1.5.3-redis-py的安装
    [Python3网络爬虫开发实战] 1.5.2-PyMongo的安装
    [Python3网络爬虫开发实战] 1.4.3-Redis的安装
  • 原文地址:https://www.cnblogs.com/mayuan01/p/12555248.html
Copyright © 2011-2022 走看看