zoukankan      html  css  js  c++  java
  • 并发编程 05—— Callable和Future

    第1部分 Callable

    第2部分 Future

    第3部分 示例和源码分析

      3.1 submit()

      3.2 FutureTask的构造函数

      3.3 FutureTask的run()方法

    第4部分 实例——JAVA并行异步编程线程池+FutureTask

    第5部分 Callable和Future 区别

    参考

    Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。

    第1部分 Callable

      Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。

    为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。

    Callable的源码如下:

    public interface Callable<V> {
        V call() throws Exception;
    }

      Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常。

    Executors 类包含一些从其他普通形式转换成 Callable 类的实用方法。

    第2部分 Future

      Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。

    Future的源码如下:

     boolean cancel(boolean mayInterruptIfRunning) 
              试图取消对此任务的执行。 
     V get() 
              如有必要,等待计算完成,然后获取其结果。 
     V get(long timeout, TimeUnit unit) 
              如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 
     boolean isCancelled() 
              如果在任务正常完成前将其取消,则返回 trueboolean isDone() 
              如果任务已完成,则返回 true

    在讲解FutureTask之前,先看看Callable, Future, FutureTask它们之间的关系图,如下:

      Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成以后,它就永远停留在“完成”状态上。

      get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得封装的初始异常。

    异常抛出: 
    CancellationException - 如果计算被取消 
    ExecutionException - 如果计算抛出异常 
    InterruptedException - 如果当前的线程在等待时被中断 
    TimeoutException - 如果等待超时

      可以通过很多种方法创建一个Future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable 或 Callable 提交给Executor ,并得到一个Future 用来获得任务的执行结果或者取消任务。 还可以显示地为某个任务指定的Runnable或Callable 实例化一个FutureTask。

    说明
    (01) RunnableFuture是一个接口,它继承了Runnable和Future这两个接口。RunnableFuture的源码如下:

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }

    (02) FutureTask实现了RunnableFuture接口。所以,也说它实现了Future接口。

    第3部分 示例和源码分析

    先通过一个示例看看Callable和Future的基本用法,然后再分析示例的实现原理。

     1 package com.concurrency.TaskExecution_6;
     2 
     3 import java.util.concurrent.Callable;
     4 import java.util.concurrent.ExecutionException;
     5 import java.util.concurrent.ExecutorService;
     6 import java.util.concurrent.Executors;
     7 import java.util.concurrent.Future;
     8 
     9 /**
    10  * Callable 和 Future实现线程等待
    11  * @ClassName: CallableFutureTest
    12  * @author Xingle
    13  * @date 2014-9-15 下午3:23:30
    14  */
    15 public class CallableFutureTest {
    16     
    17     public static void main(String[] args) throws InterruptedException, ExecutionException{
    18         System.out.println("start main thread ");
    19         ExecutorService exec = Executors.newFixedThreadPool(5);
    20         
    21         //新建一个Callable 任务,并将其提交到一个ExecutorService. 将返回一个描述任务情况的Future.
    22         Callable<String> call = new Callable<String>() {
    23 
    24             @Override
    25             public String call() throws Exception {
    26                 System.out.println("start new thread ");
    27                 Thread.sleep(5000);
    28                 System.out.println("end new thread ");
    29                 return "返回内容";
    30             }
    31         };
    32         
    33         Future<String> task = exec.submit(call);
    34         Thread.sleep(1000);
    35         String retn = task.get();
    36         //关闭线程池
    37         exec.shutdown();
    38         System.out.println(retn+"--end main thread");
    39     }
    40 
    41 }

    执行结果:

    3.1 submit()

    submit()在java/util/concurrent/AbstractExecutorService.java中实现,它的源码如下:

    1 public <T> Future<T> submit(Callable<T> task) {
    2     if (task == null) throw new NullPointerException();
    3     // 创建一个RunnableFuture对象
    4     RunnableFuture<T> ftask = newTaskFor(task);
    5     // 执行“任务ftask”
    6     execute(ftask);
    7     // 返回“ftask”
    8     return ftask;
    9 }

    说明:submit()通过newTaskFor(task)创建了RunnableFuture对象ftask。它的源码如下:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    3.2 FutureTask的构造函数

    FutureTask的构造函数如下:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        // callable是一个Callable对象
        this.callable = callable;
        // state记录FutureTask的状态
        this.state = NEW;       // ensure visibility of callable
    }

    3.3 FutureTask的run()方法

    继续回到submit()的源码中。
    在newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进行执行,最终会调用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中实现,源码如下:

     1 public void run() {
     2     if (state != NEW ||
     3         !UNSAFE.compareAndSwapObject(this, runnerOffset,
     4                                      null, Thread.currentThread()))
     5         return;
     6     try {
     7         // 将callable对象赋值给c。
     8         Callable<V> c = callable;
     9         if (c != null && state == NEW) {
    10             V result;
    11             boolean ran;
    12             try {
    13                 // 执行Callable的call()方法,并保存结果到result中。
    14                 result = c.call();
    15                 ran = true;
    16             } catch (Throwable ex) {
    17                 result = null;
    18                 ran = false;
    19                 setException(ex);
    20             }
    21             // 如果运行成功,则将result保存
    22             if (ran)
    23                 set(result);
    24         }
    25     } finally {
    26         runner = null;
    27         // 设置“state状态标记”
    28         int s = state;
    29         if (s >= INTERRUPTING)
    30             handlePossibleCancellationInterrupt(s);
    31     }
    32 }

    说明:run()中会执行Callable对象的call()方法,并且最终将结果保存到result中,并通过set(result)将result保存。
          之后调用FutureTask的get()方法,返回的就是通过set(result)保存的值。

    第4部分 实例——JAVA并行异步编程线程池+FutureTask

      通过上面的介绍,Callable 和 Future组合使用,可以获取线程的执行结果,实际项目中遇到一个问题,查询数据库的时候,有个执行任务返回的数据非常大,几乎是全部用户群的数据,所以非常耗时,后续处理逻辑要获取到所有返回的数据。这里考虑在查询数据的时候,加上行号,利用线程池多个任务同时查从起始行到结束行的数据,类似分页处理,最后将返回的数据合在一起。

     1 @Override
     2     public List<BuyerInfoVo> testQuery(final String eticketActId) {
     3         
     4         int count = grantEticketActDao.testQueryCount(eticketActId);
     5         final int pageSize = 2000;
     6         final int totalPage = count / pageSize+1;
     7 
     8         long start = System.currentTimeMillis();
     9         // 进行异步任务列表
    10         List<FutureTask<List<BuyerInfoVo>>> futureTasks = new ArrayList<FutureTask<List<BuyerInfoVo>>>();
    11         // 线程池 初始化30个线程 
    12         ExecutorService executorService = Executors.newFixedThreadPool(30);
    13 
    14         Callable<List<BuyerInfoVo>> callable = new Callable<List<BuyerInfoVo>>() {
    15             @Override
    16             public List<BuyerInfoVo> call() throws Exception {
    17                 // 执行任务
    18                 List<BuyerInfoVo> ls =  grantEticketActDao.testQueryBySize(eticketActId,pageSize,totalPage);
    19                 return ls;
    20             }
    21         };
    22 
    23         List<BuyerInfoVo> reLs = new ArrayList<BuyerInfoVo>();
    24         for (int i = 0; i < totalPage; i++) {
    25             // 创建一个异步任务
    26             FutureTask<List<BuyerInfoVo>> futureTask = new FutureTask<List<BuyerInfoVo>>(
    27                     callable);
    28             futureTasks.add(futureTask);
    29             executorService.submit(futureTask);
    30         }
    31 
    32         for (FutureTask<List<BuyerInfoVo>> futureTask : futureTasks) {
    33             try {
    34                 List<BuyerInfoVo> ls = futureTask.get();
    35                 if(null!=ls)
    36                     reLs.addAll(ls);
    37             } catch (InterruptedException | ExecutionException e) {
    38                  throw new RuntimeException(e);
    39             }
    40         }
    41 
    42         // 清理线程池
    43         executorService.shutdown();
    44 
    45         long end = System.currentTimeMillis();        
    46         
    47         System.out.println("数据库查询记录总数:"+count);
    48         System.out.println("实际返回数据条数:  " + reLs.size());
    49         System.out.println("一共使用时间:"+(end-start)/1000+"s");    
    50         if(reLs.size()!=count){
    51             throw new RuntimeException();
    52         }
    53         GrantEticketActServiceImpl.queryId = 0;
    54         return reLs;
    55     }

    其中,执行任务查询的代码:

     1 public List<BuyerInfoVo> testQueryBySize(String eticketActId, int pageSize,
     2             int totalPage) {
     3         int start ;
     4         int end;
     5         synchronized (eticketActId) {
     6             start = (GrantEticketActServiceImpl.queryId)*pageSize+1;
     7             end = (GrantEticketActServiceImpl.queryId+1)*pageSize;
     8             GrantEticketActServiceImpl.queryId++;
     9         }
    10         StringBuffer sb = new StringBuffer();
    11         //查询语句省略
    12         String querysql=" ";
    13         sb.append(" SELECT * FROM (SELECT ROWNUM row_, t.* FROM (");
    14         sb.append(querysql);
    15         sb.append(") t ) WHERE row_ <=");
    16         sb.append(end);
    17         sb.append(" AND row_ >=");
    18         sb.append(start);
    19         String sql = sb.toString();
    20         
    21         Object[]  args = new Object[]{eticketActId };
    22         List<BuyerInfoVo> list = jdbcTemplate.query(sql, args, new RowMapper<BuyerInfoVo>(){
    23             @Override
    24             public BuyerInfoVo mapRow(ResultSet rs, int i)
    25                     throws SQLException {
    26                 BuyerInfoVo vo = new BuyerInfoVo();
    27                 AutoInjection.Rs2Vo(rs, vo, null);
    28                 return vo;
    29             }
    30         });
    31         
    32         return list;
    33     }

    执行结果说明,原先优化之前,单次执行返回时间需要4min,优化后返回只需15s,效果非常明显,涉及的参数,单次查询条数,以及线程池的配置大小,还有待继续深入。

     

    第5部分 Callable和Future 区别

    Callable定义的方法是call,而Runnable定义的方法是run。
    Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
    Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。 


    参考:

    1.《java 并发编程实战》 第六章 任务执行

    2.java并发编程-Executor框架

    3.Java线程(七):Callable和Future

    4.JAVA并行异步编程线程池+FutureTask

    5.Callable vs Runnable - Runners间的“争论”

    项目中代码片段:

    /**
         * @Description:获取所有有效产品列表
         * @return
         * @Return:List<SupplyAreaProVo>
         * @Author:
         * @Date:2016年5月9日 下午12:33:49
         */
        private List<SupplyAreaProVo> getTMallProductIndexAll(){
            Util.i = 0;
            final int total = 10;
            List<FutureTask<List<SupplyAreaProVo>>> futureTasks = new ArrayList<FutureTask<List<SupplyAreaProVo>>>();
            // 线程池 初始化20个线程
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            List<SupplyAreaProVo> list=new ArrayList<SupplyAreaProVo>();
            for (int index = 0; index < total; index++) {
                final int cnt = index;
                Callable<List<SupplyAreaProVo>> callable = new Callable<List<SupplyAreaProVo>>() {
                    @Override
                    public List<SupplyAreaProVo> call() throws Exception {
                        // 执行任务
                        List<SupplyAreaProVo> list= searchTLMallDao.getTMallProductList(total,cnt);
                        return list;
                    }
                };
                // 创建一个异步任务
                FutureTask<List<SupplyAreaProVo>> futureTask = new FutureTask<List<SupplyAreaProVo>>(callable);
                futureTasks.add(futureTask);
                executorService.submit(futureTask);
            }
            
            for (FutureTask<List<SupplyAreaProVo>> futureTask : futureTasks) {          
                    try {
                        
                        List<SupplyAreaProVo> ls = futureTask.get();
                        if(null!=ls){
                            list.addAll(ls);
                        }     
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                
            }
            executorService.shutdown();
            return list;
        }
  • 相关阅读:
    poj 2485 (kruskal)
    poj 1258
    poj 2253 (dijkstra 变形)
    poj 2485 (prim)
    poj 1125 (floyd)
    poj 2240 Arbitrage (floyd 变形)
    poj 3020 Antenna Placement(二分图+最小路径覆盖)
    poj 3020 Antenna Placement(二分图+最小路径覆盖)
    poj 3278 Catch That Cow (bfs 搜索)
    poj 2049 Finding Nemo(搜索)
  • 原文地址:https://www.cnblogs.com/xingele0917/p/3973140.html
Copyright © 2011-2022 走看看