  • scala(二) Future执行逻辑解读

        在scala中是没有原生线程的,其底层使用的是java的Thread机制。但是在scala中对java Thread进行了封装,实现了更便于操作线程的Future。

      官方文档: Futures provide a way to reason about performing many operations in parallel– in an efficient and non-blocking way.

    在使用的时候只需要通过object Future 的apply方法传入执行体即可启动,那么future是如何开始运行的呢?又是如何把运行体加入到线程的执行体中的呢?其底层运行机制又是什么呢?下面就逐步看一下。

    先看一段代码.注意在代码中导入的global,其类型为global: ExecutionContext,这里暂时不进行解释,留意一下后面会用到。

    package zpj.future
    import org.scalatest.FunSuite
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
      * Created by PerkinsZhu on 2018/3/18 11:34
    class Test extends FunSuite {
      test("future demo 1") {
        Future {
          println("hello world !!!")
      val sleep = Thread.sleep(1000)

      直接运行代码会打印出“hello world !!!”。我们知道,如果使用java的Thread,则必须调用.start()方法来启动线程的运行,可是在这里我们并没有主动触发start()方法,而线程体却执行了。下面进入源码中看一下。在这之前注意打开idea的Structure窗口,留意每个方法是属于哪个class、object或者trait中。这样便于理解整个Future 的结构关系。


      def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] =
        unit.map(_ => body)


    /** A Future which is always completed with the Unit value.
      val unit: Future[Unit] = successful(())

     一个值为unit 的已完成future。这里调用的successful(())函数。注意传入的() ,这个就是该future的值:Unit 。可以看一下()的类型:

    很明显()就是上面注释所说的 Unit value.


     /** Creates an already completed Future with the specified result.
       *  @tparam T       the type of the value in the future
       *  @param result   the given successful value
       *  @return         the newly created `Future` instance
      def successful[T](result: T): Future[T] = Promise.successful(result).future



      /** Creates an already completed Promise with the specified result.
       *  @tparam T       the type of the value in the promise
       *  @return         the newly created `Promise` object
      def successful[T](result: T): Promise[T] = fromTry(Success(result))

      到这里看到Success(result)大概就明白了,这就是用来构建future的结果值,其结果便是Success(()) 。【疑问1】同时注意一下这里返回的结果类型为Promise[T],而其调用出接收的却是Future,这两处是如何对接的呢?我们暂时放一下,先看下面。那fromTry又是做什么呢?

     /** Creates an already completed Promise with the specified result or exception.
       *  @tparam T       the type of the value in the promise
       *  @return         the newly created `Promise` object
      def fromTry[T](result: Try[T]): Promise[T] = impl.Promise.KeptPromise[T](result)


    def apply[T](result: Try[T]): scala.concurrent.Promise[T] =
          resolveTry(result) match {
            case s @ Success(_) => new Successful(s)
            case f @ Failure(_) => new Failed(f)


    private[this] sealed trait Kept[T] extends Promise[T]
    private[this] final class Successful[T](val result: Success[T]) extends Kept[T] private[this] final class Failed[T](val result: Failure[T]) extends Kept[T]


    private def resolveTry[T](source: Try[T]): Try[T] = source match {
        case Failure(t) => resolver(t)
        case _          => source
      private def resolver[T](throwable: Throwable): Try[T] = throwable match {
        case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T])
        case t: scala.util.control.ControlThrowable    => Failure(new ExecutionException("Boxed ControlThrowable", t))
        case t: InterruptedException                   => Failure(new ExecutionException("Boxed InterruptedException", t))
        case e: Error                                  => Failure(new ExecutionException("Boxed Error", e))
        case t                                         => Failure(t)

      走到这里,就明白了Promise.successful(result).future中的  前半部分的执行机。还记得上面抛出的一个疑问吗?这里就对【疑问1】解释一下。

    def successful[T](result: T): Future[T] = Promise.successful(result).future接收的是Future,而Promise.successful(result)返回的是一个Promise,这两个类型怎么对接呢?后面调用了future ,我们进入看一下
    trait Promise[T] {
      def future: Future[T]



    发现Kept中也没有,那么久继续向上找,private[this] sealed trait Kept[T] extends Promise[T],(注意这里的Promise是scala.concurrent.impl中的Promise,不是刚才的scala.concurrent.Promis)这里我们进入scala.concurrent.Promise看一下:

    private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
      def future: this.type = this

     会发现在 scala.concurrent.impl.Promise[T]  extends scala.concurrent.Promise[T],且两者都是特质(注意区分这两个Promise)。在下面可以看到 future 在这里被实现了def future: this.type = this。对于这里该如何理解呢?

    future返回的结果应该是Future[T]类型的,那么这里的this.type 应该就是Promise类型,而this就应该是上面的Successful(())。这里可能有些不太容易理解,事实上 scala.concurrent.impl.Promise继承了Promise 混合了Future ,注意看上面的继承关系:

    private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T]
    这里的with混合了scala.concurrent.Future特质,通过def future: this.type = this把Promise类型转化为Future返回给了调用处。


    def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] =
        unit.map(_ => body)


    def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f)
    def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]


      override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = {
        val p = new DefaultPromise[S]()
        onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }


      1、val p = new DefaultPromise[S]()。创建了 一个scala.concurrent.impl.Promise.DefaultPromise实例,进入DefaultPromise的构造器中看一下:

    class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]


      2、onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) },在理解这行代码的时候需要注意scala的参数类型,明确其传入的是函数还是参数值。

        我们进入onComplete 发现是一个scala.concurrent.Future#onComplete的抽象方法。那么找到其实现处:scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete,看一下源码:

    override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
            (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)


    注意这里new CallbackRunnable(executor.prepare(), func)) 传入的对象 executor,和func,这里的executor是从上面一路带过来的(implicit executor: ExecutionContext),也就是我们上面刚开始导入的import scala.concurrent.ExecutionContext.Implicits.global;在看func,回溯上面会发现func就是scala.concurrent.Promise#complete方法,根据名字可以指定是在Future 完成之后的回调,接收的参数就是Future.apply()的函数体。


    private final class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable {
      // must be filled in before running it
      var value: Try[T] = null
      override def run() = {
        require(value ne null) // must set value to non-null before running!
        try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
      def executeWithValue(v: Try[T]): Unit = {
        require(value eq null) // can't complete it twice
        value = v
        // Note that we cannot prepare the ExecutionContext at this point, since we might
        // already be running on a different thread!
        try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t }


      1、继承关系可以发现CallbackRunnable是java.lang.Runnable的实现类,因此其实一个可以在java Threa中运行的线程。  CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable 

      2、注意其构造器参数,executor是一个全局线程池,onComplete: Try[T] => Any是一个函数。函数是可以调用的代码块,可以传参的(理解scala的函数式编程)。


      4、注意executeWithValue的参数v,其把v赋值给Value。赋值之后调用了 executor.execute(this);该命令再熟悉不过了,调用线程池执行线程,这里的this就是CallbackRunnable实例。


    scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete 是在单独的线程中执行的,结合上面的 onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }这块代码,发现onComplete执行的就是scala.concurrent.Promise#complete的代码逻辑。
    override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = {
        val p = new DefaultPromise[S]()
        onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }

       注意这里面的参数类型,f: Try[T] => Try[S]是一个函数,然而注意这里: p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) ,看一下 p.complete()方法接收的参数类型是什么:

      def complete(result: Try[T]): this.type =
        if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")



      override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = {
        val p = new DefaultPromise[S]()
        onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }

     在onComplete ()中开启线程,并执行线程体。在线程执行过程中,调用p.complete()函数,而在调用p.complete()之前会触发f()函数的调用,这样便触发了Future.apply()的执行,于是便执行了 println("hello world !!!") 代码块。

      因此Future.apply()中的代码块是在单独的一个线程中执行的,这便是scala 中Future自动开启线程执行代码块的机制。

    这里不太容易理解的就是这个函数的调用时机。搞清楚Future是如何把Future.apply()代码块加载到java Thread中运行之后,Future的核心便易于理解了。


    onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }

     这个result 是从哪里过来的呢?我们知道future是可以组合上一个future的结果的。例如:

    Future { 10 }.map( _ + 10).map(_ * 10)

      这里执行逻辑时机上是(10+10)* 10  结果就是200 ,那么这里的10如何传给第二个map函数的呢?又是如何把20传给第三个map函数的呢?


    override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
            (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)


    override def run() = {
        require(value ne null) // must set value to non-null before running!
        try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
    override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = {
        val p = new DefaultPromise[S]()
        onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }

      这里的result便是线程run方法中传入的Value,那么在(new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)这里的result又是哪里来的呢?


     private[this] sealed trait Kept[T] extends Promise[T] {
          def result: Try[T]
          override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
            (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)


     private[this] final class Successful[T](val result: Success[T]) extends Kept[T] 


    def apply[T](result: Try[T]): scala.concurrent.Promise[T] =
          resolveTry(result) match {
            case s @ Success(_) => new Successful(s)
            case f @ Failure(_) => new Failed(f)




      override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = {
        val p = new DefaultPromise[S]()
        onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }

     unit在调用transform的时候,执行的 onComplete 是scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete。而看第三行返回的结果: p.future,也即是说第一个Future返回的对象是DefaultPromise()实例的future。结合代码:

    Future { 10 }.map( _ + 10).map(_ * 10)


    def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
    override def transform[S](f: Try[T]
    => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }

     注意这里调用transform的不再是KeptPromise()了,而是DefaultPromise()的实例在调用。所以 在调用onComplete()的时候进入的就是scala.concurrent.impl.Promise.DefaultPromise#onComplete,而不再是scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete


    final def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
          dispatchOrAddCallback(new CallbackRunnable[T](executor.prepare(), func))

     注意这里只是new 了一个CallbackRunnable,并没有启动。不启动的原因就是不确定上一个Future是否执行成功。可能需要等待,由此可以猜到dispatchOrAddCallback()的目的就是对调用者future进行判断和等待的逻辑。看一下scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback的源码:

        /** Tries to add the callback, if already completed, it dispatches the callback to be executed.
         *  Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks
         *  to the root promise when linking two promises together.
        private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {
          get() match {
            case r: Try[_]          => runnable.executeWithValue(r.asInstanceOf[Try[T]])
            case dp: DefaultPromise[_] => compressedRoot(dp).dispatchOrAddCallback(runnable)
            case listeners: List[_] => if (compareAndSet(listeners, runnable :: listeners)) ()
                                       else dispatchOrAddCallback(runnable)
         * Gets the current value.
         * @return the current value
        public final V get() {// 注意该方法的路径:java.util.concurrent.atomic.AtomicReference#get
            return value;



       2、case r: Try[_] 该分支说明调用者future已经结束,启动该future的线程,执行map中的操作。


    class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T] 

     注意这里传入的是Nil ,这也是为什么会有case listeners: List[_]分支的原因。



    原文链接:scala(二) Future执行逻辑解读



