Future设计模式核心思想是实现业务逻辑从串行化执行到异步和异步回调
示例一:
package com.dwz.concurrency2.chapter10; public class SyncInvoker { public static void main(String[] args) throws InterruptedException { String result = get(); System.out.println(result);
System.out.println("I am done."); } private static String get() throws InterruptedException { Thread.sleep(10_000L); return "FINISH"; } }
测试结果:必须等待 result 获取到才执行 I am done.
优化一:实际的业务被提前调用执行,当我们需要的时候再主动通过 get() 获取返回的数据
Future接口
package com.dwz.concurrency2.chapter9; public interface Future<T> { T get() throws InterruptedException; }
FutureTask接口
package com.dwz.concurrency2.chapter9; public interface FutureTask<T> { T call(); }
FutureService类
package com.dwz.concurrency2.chapter9; import java.util.function.Consumer; public class FutureService { public <T> Future<T> submit(final FutureTask<T> task) { AsynFuture<T> asynfuture = new AsynFuture<T>(); new Thread(() -> { T result = task.call(); asynfuture.done(result); }).start(); return asynfuture; } }
AsynFuture是Future的实现类
package com.dwz.concurrency2.chapter9; public class AsynFuture<T> implements Future<T> { private volatile boolean done = false; private T result; public void done(T result) { synchronized (this) { this.result = result; this.done = true; this.notifyAll(); } } @Override public T get() throws InterruptedException { synchronized (this) { while (!done) { this.wait(); } } return result; } }
测试类
package com.dwz.concurrency2.chapter10; public class SyncInvoker { public static void main(String[] args) throws InterruptedException { FutureService futureservice = new FutureService(); Future<String> future = futureservice.submit(()->{
//具体业务 try { Thread.sleep(10_000L); } catch (InterruptedException e) { e.printStackTrace(); } return "FINISH"; }); System.out.println("======================"); System.out.println(" do other things."); System.out.println("***********************"); System.out.println(future.get()); System.out.println("I am done."); } }
测试结果: 首先调用futureservice.submit 来处理业务,当我们需要业务处理完成的返回值的时候调用 future.get() 获取,满足设想
优化二:增加callBack回调,处理完成后的数据不需要我们主动去调用,自己返回
修改FutureService类,增加消费者回调
public <T> Future<T> submit(final FutureTask<T> task, final Consumer<T> consumer) { AsynFuture<T> asynfuture = new AsynFuture<T>(); new Thread(() -> { T result = task.call(); asynfuture.done(result); consumer.accept(result); }).start(); return asynfuture; }
或
public <T> void submit(final FutureTask<T> task, final Consumer<T> consumer) { AsynFuture<T> asynfuture = new AsynFuture<T>(); new Thread(() -> { T result = task.call(); asynfuture.done(result); consumer.accept(result); }).start(); }
测试
package com.dwz.concurrency2.chapter9; /** * Future ->代表的是未来的一个凭据 * FutureTask ->将你的调用逻辑进行了隔离 * FutureService->桥接 Future 和 FutureTask */ public class SyncInvoker { public static void main(String[] args) throws InterruptedException { FutureService futureservice = new FutureService(); futureservice.submit(() -> { //具体业务 try { Thread.sleep(10_000L); } catch (InterruptedException e) { e.printStackTrace(); } return "FINISH"; }, System.out::println); System.out.println("==================="); System.out.println(" do other things."); Thread.sleep(2000L); System.out.println("**********************"); System.out.println("syncinvoker is done."); } }
测试结果:满足需求