zoukankan      html  css  js  c++  java
  • Java Thread系列(十)Future 模式

    Java Thread系列(十)Future 模式

    Future 模式适合在处理很耗时的业务逻辑时进行使用,可以有效的减少系统的响应时间,提高系统的吞吐量。

    一、Future 模式核心思想

    如下的请求调用过程时序图。当 call 请求发出时,需要很长的时间才能返回。左边的图需要一直等待,等返回数据后才能继续其他操作;而右边的 Future 模式的图中客户端则无需等到可以做其他的事情。服务器段接收到请求后立即返回结果给客户端,这个结果并不是真实的结果(是虚拟的结果),也就 是先获得一个假数据,然后执行其他操作。

    Future模式核心思想

    二、Future 实现

    (1) FutureClient

    Client主要完成的功能包括:1. 返回一个 FutureData;2.开启一个线程用于构造 RealData。

    public class FutureClient {
        public FutureData request(final String queryStr) {
            final FutureData future = new FutureData();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    future.request(queryStr);
                }
            }).start();
            return future;
        }
    }
    

    (2) Data

    无论是 FutureData 还是 RealData 都实现该接口。

    public interface Data {
        String getRequest();
    }
    

    (3) FutureData

    FutureData 是 Future 模式的关键,它实际上是真实数据 RealData 的代理,封装了获取 RealData 的等待过程。

    public class FutureData implements Data {
    
        private boolean isReady = false;
        private RealData realData;
    
        public FutureData() {
        }
    
        public synchronized void setRealData(RealData realData) {
            if (isReady) {
                return;
            }
            this.isReady = true;
            this.realData = realData;
            this.notify();
        }
    
        public void request(String queryStr) {
            RealData realData = new RealData(queryStr);
            this.setRealData(realData);
        }
    
        public synchronized String getRequest() {
            if (!isReady) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    ;
                }
            }
            return this.realData.getRequest();
        }
    }
    

    (4) RealData

    RealData 是最终需要使用的数据,它的构造函数很慢。

    public class RealData implements Data {
        private String data;
    
        public RealData(String queryStr) {
            this.data = deal(queryStr);
        }
    
        public String deal(String queryStr) {
            try {
                Thread.sleep(1000 * 5);
            } catch (InterruptedException e) {
                ;
            }
            return "这是处理后的结果...";
        }
    
        public String getRequest() {
            return this.data;
        }
    }
    

    (5) 测试

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    FutureClient client = new FutureClient();
    FutureData future = client.request("请求参数");
    
    System.out.println(format.format(new Date()) + "发出请求...");
    Thread.sleep(1000 * 2);
    
    System.out.println(future.getRequest());
    System.out.println(format.format(new Date()) + "收到结果...");
    

    三、Future 模式的 JDK 内置实现

    java.util.concurrent 已经内置了 Future 模式的实现。其中最为重要的是 FutureTask 类,它实现了 Runnable 接口,作为单独的线程运行。在其 run() 方法中,通过 Sync 内部类调用 Callable 接口,并维护 Callable 接口的返回对象。当使用 FutureTask.get() 方法时,将返回 Callable 接口的返回对象。

    下面对 Callable、Future 和 FutureTask 进行浅析

    Callable接口

    我们先回顾一下java.lang.Runnable接口,就声明了run(),其返回值为void,当然就无法获取结果了。

    public interface Runnable {  
        public abstract void run();  
    } 
    

    而Callable的接口定义如下:

    public interface Callable<V> {   
        V call() throws Exception;   
    }    
    

    无论是 Runnable 接口的实现类还是 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 都实现了 ExcutorService 接口,而因此 Callable 需要和 Executor 框架中的 ExcutorService 结合使用,我们先看看 ExecutorService 提供的方法:

    // 返回封装了异步计算结果的Future
    <T> Future<T> submit(Callable<T> task);  
    // 指定调用Future的get方法时返回的result对象
    <T> Future<T> submit(Runnable task, T result); 
    // 返回封装了异步计算结果的Future
    Future<?> submit(Runnable task);
    

    因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。还有点要注意的是,除了我们自己实现Callable对象外,我们还可以使用工厂类Executors来把一个Runnable对象包装成Callable对象。Executors工厂类提供的方法如下:

    public static Callable<Object> callable(Runnable task)  
    public static <T> Callable<T> callable(Runnable task, T result)  
    

    Future接口

    Future接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:

    public interface Future<V> {  
        boolean cancel(boolean mayInterruptIfRunning);  
        boolean isCancelled();  
        boolean isDone();  
        V get() throws InterruptedException, ExecutionException;  
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;  
    }
    

    方法解析:

    • V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。

    • V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。

    • boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。

    • boolean isCanceller() :如果任务完成前被取消,则返回true。

    • boolean cancel(boolean mayInterruptRunning) :mayInterruptRunning参数表示是否中断执行中的线程

      1. 如果任务还没开始,执行cancel(...)方法将返回false;

      2. 如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;

      3. 当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;

      4. 当任务已经完成,执行cancel(...)方法将返回false。

    通过方法分析我们也知道实际上Future提供了3种功能:(1)能够中断执行中的任务(2)判断任务是否执行完成(3)获取任务执行完成后额结果。

    但是我们必须明白Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。

    FutureTask 深度解析

    我们先来看看FutureTask的实现

    public class FutureTask<V> implements RunnableFuture<V> {  }
    

    FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

    public interface RunnableFuture<V> extends Runnable, Future<V> {  
        void run();  
    }  
    

    FutureTask 有两个很重要的属性,分别是 state runner ,futureTask之所以可以支持cancel操作,就是因为这两个属性

    其中 state为 枚举值:

    • NEW 新建 0
    • COMPLETING 执行中 1
    • NORMAL 正常 2
    • EXCEPTIONAL 异常 3
    • CANCELLED 取消 4
    • INTERRUPTING 中断中 5
    • INTERRUNPED 被中断 6

    state的状态变化可以有四种方式:

    • NEW->COMPLETING->NORMAL 正常完成的流程
    • NEW->COMPLETING->EXCEPTIONAL 出现异常的流程
    • NEW->CANCELED 被取消
    • NEW->INTERRUNPING->INTERRRUNPTED 被中断

    我们研究下Task的状态变化也就是一个任务的生命周期:

    java源代码:

    首先看一下 FutureTask 构造方法:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    

    Task生命周期的变化,主要取决于 run()方法先被调用还是cancel () 方法会被调用,这两个方法的执行顺序决定了Task的生命周期的四种走向。我们先分析run方法先被调用的情况

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            // 如果要执行的任务不为空 并且状态 new 就执行
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 有异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            // 不管是否执行成功了,都把runner设置成null 
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    Task执行后如果成功会调用 set() 方法,如果有异常会调用 setException() 方法。

    我们先看下set方法 :

    protected void set(V v) { // (1)
        // 如过state是 NEW 把state设置成 COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            // 将任务设置成NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    1. 如果现在的状态是 NEW 就把状态设置成 COMPLETING 然后设置成 NORMAL。这个执行流程导致的状态变化就是

      NEW->COMPLETING->NORMAL

      执行步骤是:首先执行 run() 并且Task正常完成而且在这其间没有调用 cancel()

    2. 上边是任务正常执行完成的状态变化,我们在看下有异常的情况。有异常的话会调用setException()方法:

    protected void setException(Throwable t) { // (1)  
        // 如过state是new 把state设置成 COMPLETING 
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  
            outcome = t;  
             // 将任务设置成 EXCEPTIONAL   
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state  
            finishCompletion();  
        }  
    }
    
    1. 如果现在的状态是 NEW 就把状态设置成 COMPLETING,然后设置成 EXCEPTIONAL。这个执行流程导致的状态变化就是

      NEW->COMPLETING->EXCEPTIONAL

      执行步骤是:首先执行 run() 并且Task抛出异常而且在这其间没有调用cancel()。

    2. 上文所分析的场景只是 run() 方法被调用了,而在run()方法执行的过程中没有调用cancel()

      现在我们分析下cancel()方法先被调用的情况

    public boolean cancel(boolean mayInterruptIfRunning) {  // (1) 
        /** 
         * 如果state不是 NEW 那么就退出方法,这时的任务任务坑是已经完成了,或是被取消了,或是被中断了 
         * 如果是state 是 NEW 就设置 state 为中断状态或是取消状态 
         */  
        if (!(state == NEW &&  
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,  
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  // (1) 
            return false;  
        try {    // in case call to interrupt throws exception  
            // 如果是可中断,那么就调用系统中断方法,然后把状态设置成 INTERRUPTED
            // 若 callable 内部有处理线程中断的机制,任务可能会中断
            if (mayInterruptIfRunning) { // (2)  
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);  
                }  
            }  
        } finally {  
            finishCompletion();  
        }  
        return true;  
    }  
    
    1. 如果是 cancel(false) ,那么Task的状态变化就是 NEW -> CANCELLED,不会对任务产生影响。

    2. 如果是 cancel(true) ,那么Task的状态化就是 NEW -> INTERRUPTING -> INTERRUPTED,若 callable 内部有处理线程中断的机制,可能会任务产生影响。

    Future之HelloWorld

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    //ExecutorService pool = Executors.newFixedThreadPool(1);
    FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(5 * 1000);
            return new Random().nextInt(100);
        }
    });
    
    System.out.println(format.format(new Date()) + "发出请求...");
    new Thread(future).start();
    //pool.submit(future);
    
    try {
        Integer ret = future.get();
        System.out.println(format.format(new Date()) + "收到结果,ret = " + ret);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    

    Future之ExecutorService

    使用Callable和Future,通过ExecutorService的submit方法执行Callable,并返回Future,代码如下:

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    ExecutorService pool = Executors.newSingleThreadExecutor();
    
    System.out.println(format.format(new Date()) + "发出请求...");
    Future<Integer> future = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(5 * 1000);
            return new Random().nextInt(100);
        }
    });
    
    try {
        Integer ret = future.get();
        System.out.println(format.format(new Date()) + "收到结果,ret = " + ret);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    

    代码是不是简化了很多,ExecutorService 继承自 Executor,它的目的是为我们管理Thread对象,从而简化并发编程,Executor使我们无需显示的去管理线程的生命周期,是JDK 5之后启动任务的首选方式。

    Future之多任务

    执行多个带返回值的任务,并取得多个返回值,代码如下:

    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    ExecutorService threadPool = Executors.newCachedThreadPool();
    CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(threadPool);
    
    System.out.println(format.format(new Date()) + "发出请求...");
    for(int i = 1; i < 5; i++) {
        final int taskID = i;
        cs.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
                Thread.sleep(5 * 1000);
                return taskID;
            }
        });
    }
    
    // 可能做一些事情
    for(int i = 1; i < 5; i++) {
        try {
            System.out.println(format.format(new Date()) + ":" + cs.take().get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    python入门:字符编码
    python入门:字符串2
    使用keepalived实现高可用
    基于sersync实现实时同步
    kubeadm 的工作原理
    docker-stop不能停止容器
    kubernetes 中的证书工作机制
    docker-hub中python的tag都代表什么意思
    MFS 介绍
    安装sngrep线路抓包工具
  • 原文地址:https://www.cnblogs.com/binarylei/p/8999718.html
Copyright © 2011-2022 走看看