zoukankan      html  css  js  c++  java
  • 多线程(10) — Future模式

      Future模式是多线程开发中常用常见的一种设计模式,它的核心思想是异步调用。在调用一个函数方法时候,如果函数执行很慢,我们就要进行等待,但这时我们可能不着急要结果,因此我们可以让被调者立即返回,让它在后台慢慢处理这个请求,对于调用者来说可以先处理一些其他事物,在真正需要数据的场合再去尝试获得需要的数据。对于Future模式来说,虽然它无法立即给出你需要的数据,但是它们返回一个契约给你,将来你可以凭借这个契约去重新获取你需要的信息。主要的角色有:

    • Main:系统启动,调用Client发出请求。
    • Client:返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData
    • Data:返回数据的接口。
    • FutureData:Future数据构造很快,但是是一个虚拟的数据,需要装配RealData
    • RealData:真实数据,其构造是比较慢的

      Future模式简单实现:一个核心接口Data,就是客户端希望获取的数据,在Future模式中,这个Data接口有两个重要的实现,一个是RealData,是真实数据,就是最终要获得的信息。另外一个是FutureData,它是用来提取RealData的一个订单,因此FutureData可以立即返回。

    public interface Data {
        public String getResult();
    }

      FutureData实现了一个快速返回的RealData包装,只是一个包装,或者说是虚拟实现,它可以很快构造并返回。当使用FutureData的getResult()方法,如果实际数据没准备好程序会阻塞,等RealData准备好并注入FutureData才最终返回数据。

    注意:FutureData是Future模式的关键,实际上是真实数据RealData的代理,封装了获取RealData的等待过程。

    public class FutureData implements Data {
    
        protected RealData realdata = null;
        protected boolean isReady = false;
        
        public synchronized void setRealData(RealData realdata){
            if(isReady){
                return;
            }
            this.realdata = realdata;
            isReady = true;
            notifyAll();
        }
        
        @Override
        public synchronized String getResult() {
            while(!isReady){
                try {
                    wait();
                } catch (InterruptedException e) {}
            }
            return realdata.result;
        }
    }

      RealData是最终需要使用的数据模型,它的构造很慢,用sleep()函数模拟这个过程,简单地模拟一个字符串的构造。

    public class RealData implements Data {
    
        protected final String result;
        public RealData(String para){
            // RealData的构造可能很慢,需要用户等待很久,这里用sleep模拟
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < 10; i++) {
                sb.append(para);
                try {
                    // 这里使用sleep代替一个很慢的操作过程
                    Thread.sleep(100);
                } catch (InterruptedException e) {}
            }
            result = sb.toString();
        }
        @Override
        public String getResult() {
            return result;
        }
    }

      接下来就是客户端程序,Client主要实现了获取FutureData,并开启构造RealData的线程,并在接受请求后,很快返回FutureData。注意,它不会等待数据真的构造完毕再返回,而是立即返回FutureData,即使这个时候FutureData内没有真实数据。

    public class Client {
        public Data request(final String queryStr){
            final FutureData future = new FutureData();
            new Thread(){
    
                @Override
                public void run() {
                    RealData realdata = new RealData(queryStr);
                    future.setRealData(realdata);
                }
                
            }.start();
            return future;
        }
    }
    public static void main(String[] args) {
        Client client = new Client();
        // 这里立即返回,得到的是Future而不是RealData
        Data data = client.request("Hello");
        System.out.println("请求完成");
        try {
            Thread.sleep(2000);// 这个过程代替RealData被创建的过程,也就是自己的业务处理过程
        } catch (InterruptedException e) {}
        // 使用真实的数据
        System.out.println("数据 = "+data.getResult());
    }

    JDK中的Future模式

      JDK中,Future接口类似于模式中的契约,通过它,可以得到真实的数据。RunnableFuture继承了Future和Runnable俩接口,其中run()方法用于构造真实的数据,它有一个具体的实现FutureTask类,这个实现类有个内部类Sync,一些实质性的工作会委托Sync实现,而Sync类最终会调用Callable接口,完成实际数据的组装工作。Callable接口只有一个方法call(),它会返回需要构造的实际数据。这个Callable接口也是Future框架和应用程序之间的重要接口。要实现自己的业务系统,通常需要实现自己的Callable对象,此外,FutureTask类也是与应用密切相关,通常可以使用Callable实例构造一个FutureTask实例,并将它交给线程池。下面来举个使用的例子:

    public class RealData implements Callable<String>{
        private String para;
        public RealDatajdk(String para){
            this.para = para;
        }
        
        @Override
        public String call() throws Exception {
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < 10; i++) {
                sb.append(para);
                Thread.sleep(100);
            }
            return sb.toString();
        }
    }

      上述代码实现了Callable接口,它的call()方法会构造我们需要的真实数据并返回,当然这个过程可能是缓慢的,这里使用Thread.sleep()方法模拟它。

    public class FutureMain {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            FutureTask<String> future = new FutureTask<String>(new RealData("a"));// 构造FutureTask
            ExecutorService executor = Executors.newFixedThreadPool(1);
            // 执行FutureTask,相当于上例中client.request("a")发送请求
            // 在这里开启线程进行RealData的call()方法执行
            executor.submit(future);
            System.out.println("请求完毕");
            try {
                // 这里可以做额外的数据操作,使用sleep代替其他业务逻辑处理
                Thread.sleep(2000);
            } catch (InterruptedException e) {}
            // 取得call()方法的返回值,如果call()方法还没有执行完,就会等待
            System.out.println("数据 = "+future.get());
        }
    }

    除基本的功能外JDK还为Future接口提供了一些简单的控制方法

    boolean cancel(boolean mayInterruptIfRunning);           // 取消任务
    boolean isCancelled();                                   // 是否已经取消任务
    boolean isDone();                                        // 是否已完成
    V get() throws InterruptedException, ExecutionException; // 取得返回对象
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;// 取得返回对象,可以设置超时时间

    Guava对Future模式的支持

      JDK自带的简单Future模式,可以使用get()方法得到处理结果,但是这个方法是阻塞的,因此不利于高并发应用。在Guava中增强了Future模式,增加了对模式完成时的回调接口,使得Future完成时可以自动通知应用程序进行后续处理。使用Guava改写上一节中的FutureMain可以得到更好的效果。

    public class FutureDemo {
        public static void main(String[] args) throws InterruptedException{
            ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
            ListenableFuture<String> task = service.submit(new RealData("x"));
            task.addListener(new Runnable(){
                @Override
                public void run() {
                    System.out.print("异步处理成功:");
                    try {
                        System.out.println(task.get());
                    } catch (Exception e) {}
                }
            },MoreExecutors.directExecutor());
            System.out.println("main task done......");
            Thread.sleep(3000);
        }
    }

      MoreExecutors.listeningDecorator()方法将一个普通的线程池包装为一个包含通知功能的Future线程池。第5行将Callable任务提交到线程池中,并得到一个ListenableFuture。与Future相比,ListenableFuture拥有完成时的通知功能,addListener向ListenableFuture中添加回调函数,即当Future执行完成后,执行addListener中第一个参数中代码

  • 相关阅读:
    前端开发者也可以酷酷地开发桌面程序
    手把手教你怎么搭建angular+gulp的项目(一)
    在Angular中,如果权限值是异步请求所得,如何将其设置为HTTP请求头的Authorization?
    AngularJs ng-repeat指令中怎么实现含有自定义指令的动态html
    第一篇随笔,练练手
    我参与 Seata 开源项目的一些感悟
    一次 kafka 消息堆积问题排查
    图解 Kafka 水印备份机制
    Seata 动态配置订阅与降级实现原理
    记一次 Kafka 集群线上扩容
  • 原文地址:https://www.cnblogs.com/wangyongwen/p/11335334.html
Copyright © 2011-2022 走看看