zoukankan      html  css  js  c++  java
  • Java Thread系列(十)Future 模式

    Java Thread系列(十)Future 模式

    Future 模式适合在处理很耗时的业务逻辑时进行使用,可以有效的减少系统的响应时间,提高系统的吞吐量。

    一、Future 模式核心思想

    如下的请求调用过程时序图。当 call 请求发出时,需要很长的时间才能返回。左边的图需要一直等待,等返回数据后才能继续其他操作;而右边的 Future 模式的图中客户端则无需等到可以做其他的事情。服务器段接收到请求后立即返回结果给客户端,这个结果并不是真实的结果(是虚拟的结果),也就 是先获得一个假数据,然后执行其他操作。

    Future模式核心思想

    二、Future 实现

    (1) FutureClient

    Client主要完成的功能包括:1. 返回一个 FutureData;2.开启一个线程用于构造 RealData。

    public class FutureClient {
        public FutureData request(final String queryStr) {
            final FutureData future = new FutureData();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    future.request(queryStr);
                }
            }).start();
            return future;
        }
    }
    

    (2) Data

    无论是 FutureData 还是 RealData 都实现该接口。

    public interface Data {
        String getRequest();
    }
    

    (3) FutureData

    FutureData 是 Future 模式的关键,它实际上是真实数据 RealData 的代理,封装了获取 RealData 的等待过程。

    public class FutureData implements Data {
    
        private boolean isReady = false;
        private RealData realData;
    
        public FutureData() {
        }
    
        public synchronized void setRealData(RealData realData) {
            if (isReady) {
                return;
            }
            this.isReady = true;
            this.realData = realData;
            this.notify();
        }
    
        public void request(String queryStr) {
            RealData realData = new RealData(queryStr);
            this.setRealData(realData);
        }
    
        public synchronized String getRequest() {
            if (!isReady) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    ;
                }
            }
            return this.realData.getRequest();
        }
    }
    

    (4) RealData

    RealData 是最终需要使用的数据,它的构造函数很慢。

    public class RealData implements Data {
        private String data;
    
        public RealData(String queryStr) {
            this.data = deal(queryStr);
        }
    
        public String deal(String queryStr) {
            try {
                Thread.sleep(1000 * 5);
            } catch (InterruptedException e) {
                ;
            }
            return "这是处理后的结果...";
        }
    
        public String getRequest() {
            return this.data;
        }
    }
    

    (5) 测试

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    FutureClient client = new FutureClient();
    FutureData future = client.request("请求参数");
    
    System.out.println(format.format(new Date()) + "发出请求...");
    Thread.sleep(1000 * 2);
    
    System.out.println(future.getRequest());
    System.out.println(format.format(new Date()) + "收到结果...");
    

    三、Future 模式的 JDK 内置实现

    java.util.concurrent 已经内置了 Future 模式的实现。其中最为重要的是 FutureTask 类,它实现了 Runnable 接口,作为单独的线程运行。在其 run() 方法中,通过 Sync 内部类调用 Callable 接口,并维护 Callable 接口的返回对象。当使用 FutureTask.get() 方法时,将返回 Callable 接口的返回对象。

    下面对 Callable、Future 和 FutureTask 进行浅析

    Callable接口

    我们先回顾一下java.lang.Runnable接口,就声明了run(),其返回值为void,当然就无法获取结果了。

    public interface Runnable {  
        public abstract void run();  
    } 
    

    而Callable的接口定义如下:

    public interface Callable<V> {   
        V call() throws Exception;   
    }    
    

    无论是 Runnable 接口的实现类还是 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 都实现了 ExcutorService 接口,而因此 Callable 需要和 Executor 框架中的 ExcutorService 结合使用,我们先看看 ExecutorService 提供的方法:

    // 返回封装了异步计算结果的Future
    <T> Future<T> submit(Callable<T> task);  
    // 指定调用Future的get方法时返回的result对象
    <T> Future<T> submit(Runnable task, T result); 
    // 返回封装了异步计算结果的Future
    Future<?> submit(Runnable task);
    

    因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。还有点要注意的是,除了我们自己实现Callable对象外,我们还可以使用工厂类Executors来把一个Runnable对象包装成Callable对象。Executors工厂类提供的方法如下:

    public static Callable<Object> callable(Runnable task)  
    public static <T> Callable<T> callable(Runnable task, T result)  
    

    Future接口

    Future接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:

    public interface Future<V> {  
        boolean cancel(boolean mayInterruptIfRunning);  
        boolean isCancelled();  
        boolean isDone();  
        V get() throws InterruptedException, ExecutionException;  
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;  
    }
    

    方法解析:

    • V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。

    • V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。

    • boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。

    • boolean isCanceller() :如果任务完成前被取消,则返回true。

    • boolean cancel(boolean mayInterruptRunning) :mayInterruptRunning参数表示是否中断执行中的线程

      1. 如果任务还没开始,执行cancel(...)方法将返回false;

      2. 如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;

      3. 当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;

      4. 当任务已经完成,执行cancel(...)方法将返回false。

    通过方法分析我们也知道实际上Future提供了3种功能:(1)能够中断执行中的任务(2)判断任务是否执行完成(3)获取任务执行完成后额结果。

    但是我们必须明白Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。

    FutureTask 深度解析

    我们先来看看FutureTask的实现

    public class FutureTask<V> implements RunnableFuture<V> {  }
    

    FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

    public interface RunnableFuture<V> extends Runnable, Future<V> {  
        void run();  
    }  
    

    FutureTask 有两个很重要的属性,分别是 state runner ,futureTask之所以可以支持cancel操作,就是因为这两个属性

    其中 state为 枚举值:

    • NEW 新建 0
    • COMPLETING 执行中 1
    • NORMAL 正常 2
    • EXCEPTIONAL 异常 3
    • CANCELLED 取消 4
    • INTERRUPTING 中断中 5
    • INTERRUNPED 被中断 6

    state的状态变化可以有四种方式:

    • NEW->COMPLETING->NORMAL 正常完成的流程
    • NEW->COMPLETING->EXCEPTIONAL 出现异常的流程
    • NEW->CANCELED 被取消
    • NEW->INTERRUNPING->INTERRRUNPTED 被中断

    我们研究下Task的状态变化也就是一个任务的生命周期:

    java源代码:

    首先看一下 FutureTask 构造方法:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    

    Task生命周期的变化,主要取决于 run()方法先被调用还是cancel () 方法会被调用,这两个方法的执行顺序决定了Task的生命周期的四种走向。我们先分析run方法先被调用的情况

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            // 如果要执行的任务不为空 并且状态 new 就执行
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 有异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            // 不管是否执行成功了,都把runner设置成null 
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    Task执行后如果成功会调用 set() 方法,如果有异常会调用 setException() 方法。

    我们先看下set方法 :

    protected void set(V v) { // (1)
        // 如过state是 NEW 把state设置成 COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            // 将任务设置成NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    1. 如果现在的状态是 NEW 就把状态设置成 COMPLETING 然后设置成 NORMAL。这个执行流程导致的状态变化就是

      NEW->COMPLETING->NORMAL

      执行步骤是:首先执行 run() 并且Task正常完成而且在这其间没有调用 cancel()

    2. 上边是任务正常执行完成的状态变化,我们在看下有异常的情况。有异常的话会调用setException()方法:

    protected void setException(Throwable t) { // (1)  
        // 如过state是new 把state设置成 COMPLETING 
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  
            outcome = t;  
             // 将任务设置成 EXCEPTIONAL   
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state  
            finishCompletion();  
        }  
    }
    
    1. 如果现在的状态是 NEW 就把状态设置成 COMPLETING,然后设置成 EXCEPTIONAL。这个执行流程导致的状态变化就是

      NEW->COMPLETING->EXCEPTIONAL

      执行步骤是:首先执行 run() 并且Task抛出异常而且在这其间没有调用cancel()。

    2. 上文所分析的场景只是 run() 方法被调用了,而在run()方法执行的过程中没有调用cancel()

      现在我们分析下cancel()方法先被调用的情况

    public boolean cancel(boolean mayInterruptIfRunning) {  // (1) 
        /** 
         * 如果state不是 NEW 那么就退出方法,这时的任务任务坑是已经完成了,或是被取消了,或是被中断了 
         * 如果是state 是 NEW 就设置 state 为中断状态或是取消状态 
         */  
        if (!(state == NEW &&  
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,  
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  // (1) 
            return false;  
        try {    // in case call to interrupt throws exception  
            // 如果是可中断,那么就调用系统中断方法,然后把状态设置成 INTERRUPTED
            // 若 callable 内部有处理线程中断的机制,任务可能会中断
            if (mayInterruptIfRunning) { // (2)  
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);  
                }  
            }  
        } finally {  
            finishCompletion();  
        }  
        return true;  
    }  
    
    1. 如果是 cancel(false) ,那么Task的状态变化就是 NEW -> CANCELLED,不会对任务产生影响。

    2. 如果是 cancel(true) ,那么Task的状态化就是 NEW -> INTERRUPTING -> INTERRUPTED,若 callable 内部有处理线程中断的机制,可能会任务产生影响。

    Future之HelloWorld

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    //ExecutorService pool = Executors.newFixedThreadPool(1);
    FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(5 * 1000);
            return new Random().nextInt(100);
        }
    });
    
    System.out.println(format.format(new Date()) + "发出请求...");
    new Thread(future).start();
    //pool.submit(future);
    
    try {
        Integer ret = future.get();
        System.out.println(format.format(new Date()) + "收到结果,ret = " + ret);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    

    Future之ExecutorService

    使用Callable和Future,通过ExecutorService的submit方法执行Callable,并返回Future,代码如下:

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    ExecutorService pool = Executors.newSingleThreadExecutor();
    
    System.out.println(format.format(new Date()) + "发出请求...");
    Future<Integer> future = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(5 * 1000);
            return new Random().nextInt(100);
        }
    });
    
    try {
        Integer ret = future.get();
        System.out.println(format.format(new Date()) + "收到结果,ret = " + ret);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    

    代码是不是简化了很多,ExecutorService 继承自 Executor,它的目的是为我们管理Thread对象,从而简化并发编程,Executor使我们无需显示的去管理线程的生命周期,是JDK 5之后启动任务的首选方式。

    Future之多任务

    执行多个带返回值的任务,并取得多个返回值,代码如下:

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    ExecutorService threadPool = Executors.newCachedThreadPool();
    CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(threadPool);
    
    System.out.println(format.format(new Date()) + "发出请求...");
    for(int i = 1; i < 5; i++) {
        final int taskID = i;
        cs.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
                Thread.sleep(5 * 1000);
                return taskID;
            }
        });
    }
    
    // 可能做一些事情
    for(int i = 1; i < 5; i++) {
        try {
            System.out.println(format.format(new Date()) + ":" + cs.take().get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    hdu 2222 Keywords Search
    Meet and Greet
    hdu 4673
    hdu 4768
    hdu 4747 Mex
    uva 1513 Movie collection
    uva 12299 RMQ with Shifts
    uva 11732 strcmp() Anyone?
    uva 1401
    hdu 1251 统计难题
  • 原文地址:https://www.cnblogs.com/binarylei/p/8999718.html
Copyright © 2011-2022 走看看