目录
前置条件:构造一个异步调用
一、使用wait和notify方法
二、使用条件锁
三、Future
四、使用CountDownLatch
五、使用CyclicBarrier
总结
在Java并发编程中,经常会因为需要提高响应速度而将请求异步化,即将同步请求转化为异步处理,这是很自然能想到的一种处理方式。相反,在有些场景下也需要将异步处理转化为同步的方式。
首先介绍一下同步调用和异步调用的概念:
同步调用:调用方在调用过程中,持续等待返回结果。
异步调用:调用方在调用过程中,不直接等待返回结果,而是执行其他任务,结果返回形式通常为回调函数。
其实,两者的区别还是很明显的,这里也不再细说,我们主要来说一下Java如何将异步调用转为同步。换句话说,就是需要在异步调用过程中,持续阻塞至获得调用结果。接下来将介绍5种Java并发编程中异步转同步的方法。
- 使用wait和notify方法
- 使用条件锁
- Future
- 使用CountDownLatch
- 使用CyclicBarrier
前置条件:构造一个异步调用
首先,写demo需要先写基础设施,这里是需要构造一个异步调用模型。异步调用类:
import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AsyncCall { private Random random = new Random(System.currentTimeMillis()); private ExecutorService tp = Executors.newSingleThreadExecutor(); public void call(AbstractBaseDemo demo) { new Thread(() -> { long res = random.nextInt(10); try { Thread.sleep(res * 1000); } catch (InterruptedException e) { e.printStackTrace(); } demo.callback(res); }).start(); } public Future<Long> futureCall() { return tp.submit(() -> { long res = random.nextInt(10); Thread.sleep(res * 1000); return res; }); } public void shutdown() { tp.shutdown(); } }
public abstract class AbstractBaseDemo { protected AsyncCall asyncCall = new AsyncCall(); public abstract void callback(long response); public void call() { System.out.println(Thread.currentThread().getName() + "发起调用"); asyncCall.call(this); System.out.println(Thread.currentThread().getName() + "调用返回"); } }
AbstractBaseDemo非常简单,里面包含一个异步调用类的实例,另外有一个call方法用于发起异步调用,当然还有一个抽象方法callback需要每个demo去实现的——主要在回调中进行相应的处理来达到异步调用转同步的目的。
一、使用wait和notify方法
这个方法其实是利用了锁机制,直接贴代码:
public class ObjectWaitLockDemo extends AbstractBaseDemo { private final Object lock = new Object(); @Override public void callback(long response) { System.out.println(Thread.currentThread().getName() + "得到结果"); System.out.println(response); System.out.println(Thread.currentThread().getName() + "调用结束"); synchronized (lock) { lock.notifyAll(); } } public static void main(String[] args) { ObjectWaitLockDemo demo = new ObjectWaitLockDemo(); demo.call(); synchronized (demo.lock) { try { demo.lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "主线程内容"); } }
没有使用同步操作的情况下,打印结果:
main发起调用 main调用返回 main主线程内容 Thread-0得到结果 7 Thread-0调用结束
而使用了同步操作后:
main发起调用 main调用返回 Thread-0得到结果 3 Thread-0调用结束 main主线程内容
二、使用条件锁
和方法一的原理类似:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockDemo extends AbstractBaseDemo { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @Override public void callback(long response) { System.out.println(Thread.currentThread().getName() + "得到结果"); System.out.println(response); System.out.println(Thread.currentThread().getName() + "调用结束"); lock.lock(); try { condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) { ReentrantLockDemo demo = new ReentrantLockDemo(); demo.call(); demo.lock.lock(); try { demo.condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { demo.lock.unlock(); } System.out.println(Thread.currentThread().getName() + "主线程内容"); } }
基本上和方法一没什么区别,只是这里使用了条件锁,两者的锁机制有所不同。
三、Future
使用Future的方法和之前不太一样,我们调用的异步方法也不一样。
import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class FutureDemo { private AsyncCall asyncCall = new AsyncCall(); public Future<Long> call() { Future<Long> future = asyncCall.futureCall(); asyncCall.shutdown(); return future; } public static void main(String[] args) { FutureDemo demo = new FutureDemo(); System.out.println(Thread.currentThread().getName() + "发起调用"); Future<Long> future = demo.call(); System.out.println(Thread.currentThread().getName() + "返回结果"); while (!future.isDone() && !future.isCancelled()); try { System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "主线程内容"); } }
public void shutdown() { tp.shutdown(); }
四、使用CountDownLatch
使用CountDownLatch或许是日常编程中最常见的一种了,也感觉是相对优雅的一种:
import java.util.concurrent.CountDownLatch; public class CountDownLatchDemo extends AbstractBaseDemo { private final CountDownLatch countDownLatch = new CountDownLatch(1); @Override public void callback(long response) { System.out.println(Thread.currentThread().getName() + "得到结果"); System.out.println(response); System.out.println(Thread.currentThread().getName() + "调用结束"); countDownLatch.countDown(); } public static void main(String[] args) { CountDownLatchDemo demo = new CountDownLatchDemo(); demo.call(); try { demo.countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "主线程内容"); } }
当然,这里和ObjectWaitLockDemo和ReentrantLockDemo中都一样,主线程中阻塞的部分,都可以设置一个超时时间,超时后可以不再阻塞。
五、使用CyclicBarrier
CyclicBarrier的情况和CountDownLatch有些类似:
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo extends AbstractBaseDemo { private CyclicBarrier cyclicBarrier = new CyclicBarrier(2); @Override public void callback(long response) { System.out.println(Thread.currentThread().getName() + "得到结果"); System.out.println(response); System.out.println(Thread.currentThread().getName() + "调用结束"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.call(); try { demo.cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "主线程内容"); } }
总结
综上,就是本次需要说的几种方法了。事实上,所有的方法都是同一个原理,也就是在调用的线程中进行阻塞等待结果,而在回调中函数中进行阻塞状态的解除。