zoukankan      html  css  js  c++  java
  • Java并发(二)异步转同步

    目录

      前置条件:构造一个异步调用

      一、使用wait和notify方法

      二、使用条件锁

      三、Future

      四、使用CountDownLatch

      五、使用CyclicBarrier

      总结

    在Java并发编程中,经常会因为需要提高响应速度而将请求异步化,即将同步请求转化为异步处理,这是很自然能想到的一种处理方式。相反,在有些场景下也需要将异步处理转化为同步的方式。

    首先介绍一下同步调用和异步调用的概念:

      同步调用:调用方在调用过程中,持续等待返回结果。

      异步调用:调用方在调用过程中,不直接等待返回结果,而是执行其他任务,结果返回形式通常为回调函数。

    其实,两者的区别还是很明显的,这里也不再细说,我们主要来说一下Java如何将异步调用转为同步。换句话说,就是需要在异步调用过程中,持续阻塞至获得调用结果。接下来将介绍5种Java并发编程中异步转同步的方法。

    1. 使用wait和notify方法
    2. 使用条件锁
    3. Future
    4. 使用CountDownLatch
    5. 使用CyclicBarrier

    前置条件:构造一个异步调用

    首先,写demo需要先写基础设施,这里是需要构造一个异步调用模型。异步调用类:

    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class AsyncCall {
    
        private Random random = new Random(System.currentTimeMillis());
    
        private ExecutorService tp = Executors.newSingleThreadExecutor();
    
        public void call(AbstractBaseDemo demo) {
            new Thread(() -> {
                long res = random.nextInt(10);
                try {
                    Thread.sleep(res * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                demo.callback(res);
            }).start();
        }
    
        public Future<Long> futureCall() {
            return tp.submit(() -> {
                long res = random.nextInt(10);
    
                Thread.sleep(res * 1000);
                return res;
            });
        }
    
        public void shutdown() {
            tp.shutdown();
        }
    }
    我们主要关心call方法,这个方法接收了一个demo参数,并且开启了一个线程,在线程中执行具体的任务,并利用demo的callback方法进行回调函数的调用。大家注意到了这里的返回结果就是一个[0,10)的长整型,并且结果是几,就让线程sleep多久——这主要是为了更好地观察实验结果,模拟异步调用过程中的处理时间。
    至于futureCall和shutdown方法,以及线程池tp都是为了FutureDemo利用Future来实现做准备的。
    demo的基类:
    public abstract class AbstractBaseDemo {
    
        protected AsyncCall asyncCall = new AsyncCall();
    
        public abstract void callback(long response);
    
        public void call() {
            System.out.println(Thread.currentThread().getName() + "发起调用");
            asyncCall.call(this);
            System.out.println(Thread.currentThread().getName() + "调用返回");
        }
    }

    AbstractBaseDemo非常简单,里面包含一个异步调用类的实例,另外有一个call方法用于发起异步调用,当然还有一个抽象方法callback需要每个demo去实现的——主要在回调中进行相应的处理来达到异步调用转同步的目的。

    一、使用wait和notify方法

    这个方法其实是利用了锁机制,直接贴代码:

    public class ObjectWaitLockDemo extends AbstractBaseDemo {
    
        private final Object lock = new Object();
    
        @Override
        public void callback(long response) {
            System.out.println(Thread.currentThread().getName() + "得到结果");
            System.out.println(response);
            System.out.println(Thread.currentThread().getName() + "调用结束");
    
            synchronized (lock) {
                lock.notifyAll();
            }
        }
    
        public static void main(String[] args) {
            ObjectWaitLockDemo demo = new ObjectWaitLockDemo();
    
            demo.call();
    
            synchronized (demo.lock) {
                try {
                    demo.lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            System.out.println(Thread.currentThread().getName() + "主线程内容");
        }
    }
    可以看到在发起调用后,主线程利用wait进行阻塞,等待回调中调用notify或者notifyAll方法来进行唤醒。注意,和大家认知的一样,这里wait和notify都是需要先获得对象的锁的。在主线程中最后我们打印了一个内容,这也是用来验证实验结果的,如果没有wait和notify,主线程内容会紧随调用内容立刻打印;而像我们上面的代码,主线程内容会一直等待回调函数调用结束才会进行打印。
    没有使用同步操作的情况下,打印结果:
    main发起调用
    main调用返回
    main主线程内容
    Thread-0得到结果
    7
    Thread-0调用结束

     而使用了同步操作后:

    main发起调用
    main调用返回
    Thread-0得到结果
    3
    Thread-0调用结束
    main主线程内容

    二、使用条件锁

    和方法一的原理类似:

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ReentrantLockDemo extends AbstractBaseDemo {
    
        private final Lock lock = new ReentrantLock();
        private final Condition condition = lock.newCondition();
    
        @Override
        public void callback(long response) {
            System.out.println(Thread.currentThread().getName() + "得到结果");
            System.out.println(response);
            System.out.println(Thread.currentThread().getName() + "调用结束");
    
            lock.lock();
            try {
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            ReentrantLockDemo demo = new ReentrantLockDemo();
    
            demo.call();
    
            demo.lock.lock();
    
            try {
                demo.condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                demo.lock.unlock();
            }
    
            System.out.println(Thread.currentThread().getName() + "主线程内容");
        }
    }

    基本上和方法一没什么区别,只是这里使用了条件锁,两者的锁机制有所不同。

    三、Future

    使用Future的方法和之前不太一样,我们调用的异步方法也不一样。

    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class FutureDemo {
    
        private AsyncCall asyncCall = new AsyncCall();
    
        public Future<Long> call() {
            Future<Long> future = asyncCall.futureCall();
    
            asyncCall.shutdown();
    
            return future;
        }
    
        public static void main(String[] args) {
            FutureDemo demo = new FutureDemo();
    
            System.out.println(Thread.currentThread().getName() + "发起调用");
            Future<Long> future = demo.call();
            System.out.println(Thread.currentThread().getName() + "返回结果");
    
            while (!future.isDone() && !future.isCancelled());
    
            try {
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName() + "主线程内容");
        }
    }
    我们调用futureCall方法,方法中会向线程池tp提交一个Callable,然后返回一个Future,这个Future就是我们FutureDemo中call中得到的,得到future对象之后就可以关闭线程池啦,调用asyncCall的shutdown方法。关于关闭线程池这里有一点需要注意,我们回过头来看看asyncCall的shutdown方法:
        public void shutdown() {
            tp.shutdown();
        }
    发现只是简单调用了线程池的shutdown方法,然后我们说注意的点,这里最好不要用tp的shutdownNow方法,该方法会试图去中断线程中正在执行的任务;也就是说,如果使用该方法,有可能我们的future所对应的任务将被中断,无法得到执行结果。
    然后我们关注主线程中的内容,主线程的阻塞由我们自己来实现,通过future的isDone和isCancelled来判断执行状态,一直到执行完成或被取消。随后,我们打印get到的结果。

    四、使用CountDownLatch

    使用CountDownLatch或许是日常编程中最常见的一种了,也感觉是相对优雅的一种:

    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchDemo extends AbstractBaseDemo {
    
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
    
        @Override
        public void callback(long response) {
            System.out.println(Thread.currentThread().getName() + "得到结果");
            System.out.println(response);
            System.out.println(Thread.currentThread().getName() + "调用结束");
    
            countDownLatch.countDown();
        }
    
        public static void main(String[] args) {
            CountDownLatchDemo demo = new CountDownLatchDemo();
    
            demo.call();
    
            try {
                demo.countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName() + "主线程内容");
        }
    }
    正如大家平时使用的那样,此处在主线程中利用CountDownLatch的await方法进行阻塞,在回调中利用countDown方法来使得其他线程await的部分得以继续运行。
    当然,这里和ObjectWaitLockDemo和ReentrantLockDemo中都一样,主线程中阻塞的部分,都可以设置一个超时时间,超时后可以不再阻塞。

    五、使用CyclicBarrier

    CyclicBarrier的情况和CountDownLatch有些类似:

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierDemo extends AbstractBaseDemo {
    
        private CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    
        @Override
        public void callback(long response) {
            System.out.println(Thread.currentThread().getName() + "得到结果");
            System.out.println(response);
            System.out.println(Thread.currentThread().getName() + "调用结束");
    
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
    
            CyclicBarrierDemo demo = new CyclicBarrierDemo();
    
            demo.call();
    
            try {
                demo.cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName() + "主线程内容");
    
        }
    }
    大家注意一下,CyclicBarrier和CountDownLatch仅仅只是类似,两者还是有一定区别的。比如,一个可以理解为做加法,等到加到这个数字后一起运行;一个则是减法,减到0继续运行。一个是可以重复计数的;另一个不可以等等等等。
    另外,使用CyclicBarrier的时候要注意两点。第一点,初始化的时候,参数数字要设为2,因为异步调用这里是一个线程,而主线程是一个线程,两个线程都await的时候才能继续执行,这也是和CountDownLatch区别的部分。第二点,也是关于初始化参数的数值的,和这里的demo无关,在平时编程的时候,需要比较小心,如果这个数值设置得很大,比线程池中的线程数都大,那么就很容易引起死锁了。

    总结

    综上,就是本次需要说的几种方法了。事实上,所有的方法都是同一个原理,也就是在调用的线程中进行阻塞等待结果,而在回调中函数中进行阻塞状态的解除。

    参考:5种必会的Java异步调用转同步的方法你会几种

  • 相关阅读:
    秦腾与教学评估【前缀和+二分】
    c++中成员函数声明时const得作用
    分形【递归】
    飞行兄弟【二进制枚举+异或】
    爬取4k图片网图片
    爬虫爬取博客园文章的文字【练手】
    【YBTOJ】求 f 函数
    【YBTOJ】划分数列
    【学习笔记】高斯消元法
    【Luogu P4588】 [TJOI2018]数学计算
  • 原文地址:https://www.cnblogs.com/warehouse/p/10707091.html
Copyright © 2011-2022 走看看