  • SDP(13): Scala.Future


      import scala.concurrent.duration._
      val fs = Future {println("run now..."); System.currentTimeMillis() }
                                             //> run now...
                                             //| fs  : scala.concurrent.Future[Long] = List()
      Await.result(fs, 1.second)             //> res0: Long = 1465907784714
      Await.result(fs, 1.second)             //> res1: Long = 1465907784714

    可以看到fs是在Future构建时即时运算的,而且只会运算一次。如果scala Future中包括了能产生副作用的代码,在构建时就会立即产生副作用。所以我们是无法使用scala Future来编写纯函数的,如下:

    val progA:Future[A] = for {
        b <- readFromB
        _ <- writeToLocationA(a)
        r <- getResult
    } yield r
    /* location A content updated */
    ... /* later */
    val progB: Future[B] = for {
        a <- readFromA
        _ <- updateLocationA
        c <- getResult
    val program: Future[Unit] = for {
        _ <- progA
        _ <- progB
    } yield()







    sealed abstract class Future[+A] {
    object Future {
      case class Now[+A](a: A) extends Future[A]
      case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A]
      case class Suspend[+A](thunk: () => Future[A]) extends Future[A]
      case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B]
      case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit,
                                f: A => Future[B]) extends Future[B]

    scalaz.Future[A]明显就是个Free Monad。它的结构化表达方式分别有Now,Async,Suspend,BindSuspend,BindAsync。我们可以用这些结构实现flatMap函数,所以Future就是Free Monad:

    def flatMap[B](f: A => Future[B]): Future[B] = this match {
        case Now(a) => Suspend(() => f(a))
        case Suspend(thunk) => BindSuspend(thunk, f)
        case Async(listen) => BindAsync(listen, f)
        case BindSuspend(thunk, g) =>
          Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f)))
        case BindAsync(listen, g) =>
          Suspend(() => BindAsync(listen, g andThen (_ flatMap f)))

    因为free structure类型支持算式/算法关注分离,我们可以用scalaz.Future来描述程序功能而不涉及正真运算。这样,在上面那个例子里如果progA,progB是Task类型的,那么program的构建就是安全的,因为我们最后是用Task.run来真正进行运算产生副作用的。scalaz.Task又在scalaz.Future功能基础上再增加了异常处理等功能。


      /** [[Task]] state describing an immediate synchronous value. */
      private[eval] final case class Now[A](value: A) extends Task[A] {...}
      /** [[Task]] state describing an immediate synchronous value. */
      private[eval] final case class Eval[A](thunk: () => A)
        extends Task[A]
      /** Internal state, the result of [[Task.defer]] */
      private[eval] final case class Suspend[+A](thunk: () => Task[A])
        extends Task[A]
      /** Internal [[Task]] state that is the result of applying `flatMap`. */
      private[eval] final case class FlatMap[A, B](source: Task[A], f: A => Task[B])
        extends Task[B]
     /** Internal [[Coeval]] state that is the result of applying `map`. */
      private[eval] final case class Map[S, +A](source: Task[S], f: S => A, index: Int)
        extends Task[A] with (S => Task[A]) {
        def apply(value: S): Task[A] =
          new Now(f(value))
        override def toString: String =
      /** Constructs a lazy [[Task]] instance whose result will
        * be computed asynchronously.
        * Unsafe to build directly, only use if you know what you're doing.
        * For building `Async` instances safely, see [[create]].
      private[eval] final case class Async[+A](register: (Context, Callback[A]) => Unit)
        extends Task[A]  


    object Task extends TaskInstancesLevel1 {
      /** Returns a new task that, when executed, will emit the result of
        * the given function, executed asynchronously.
        * This operation is the equivalent of:
        * {{{
        *   Task.eval(f).executeAsync
        * }}}
        * @param f is the callback to execute asynchronously
      def apply[A](f: => A): Task[A] =
      /** Returns a `Task` that on execution is always successful, emitting
        * the given strict value.
      def now[A](a: A): Task[A] =
      /** Lifts a value into the task context. Alias for [[now]]. */
      def pure[A](a: A): Task[A] = now(a)
      /** Returns a task that on execution is always finishing in error
        * emitting the specified exception.
      def raiseError[A](ex: Throwable): Task[A] =
      /** Promote a non-strict value representing a Task to a Task of the
        * same type.
      def defer[A](fa: => Task[A]): Task[A] =
        Suspend(fa _)
        source match {
          case Task.Now(v) => F.pure(v)
          case Task.Error(e) => F.raiseError(e)
          case Task.Eval(thunk) => F.delay(thunk())
          case Task.Suspend(thunk) => F.suspend(to(thunk()))
          case other => suspend(other)(F)



      import monix.eval.Task
      import monix.execution.Scheduler.Implicits.global
      final class FutureToTask[A](x: => Future[A]) {
        def asTask: Task[A] = Task.deferFuture[A(x)
      final class TaskToFuture[A](x: => Task[A]) {
        def asFuture: Future[A] = x.runAsync


    import scala.concurrent._
    import scala.util._
    import scala.concurrent.duration._
    import monix.eval.Task
    import monix.execution._
    object MonixTask extends App {
    import monix.execution.Scheduler.Implicits.global
      // Executing a sum, which (due to the semantics of apply)
      // will happen on another thread. Nothing happens on building
      // this instance though, this expression is pure, being
      // just a spec! Task by default has lazy behavior ;-)
      val task = Task { 1 + 1 }
      // Tasks get evaluated only on runAsync!
      // Callback style:
      val cancelable = task.runOnComplete {
          case Success(value) =>
          case Failure(ex) =>
            System.out.println(s"ERROR: ${ex.getMessage}")
      //=> 2
      // If we change our mind...
      // Or you can convert it into a Future
      val future: CancelableFuture[Int] =
      // Printing the result asynchronously
      //=> 2
      val task = Task.now { println("Effect"); "Hello!" }
      //=> Effect
      // task: monix.eval.Task[String] = Delay(Now(Hello!))


      /* ------ taskNow ----*/
      val taskNow = Task.now { println("Effect"); "Hello!" }
      //=> Effect
      // taskNow: monix.eval.Task[String] = Delay(Now(Hello!))
      /* --------taskDelay possible another on thread ------*/
      val taskDelay = Task { println("Effect"); "Hello!" }
      // taskDelay: monix.eval.Task[String] = Delay(Always(<function0>))
      //=> Effect
      //=> Hello!
      // The evaluation (and thus all contained side effects)
      // gets triggered on each runAsync:
      //=> Effect
      //=> Hello!
      /* --------taskOnce ------- */
      val taskOnce = Task.evalOnce { println("Effect"); "Hello!" }
      // taskOnce: monix.eval.Task[String] = EvalOnce(<function0>)
      //=> Effect
      //=> Hello!
      // Result was memoized on the first run!
      //=> Hello!
      /* --------taskFork ------- */
      // this guarantees that our task will get executed asynchronously:
      val task = Task(Task.eval("Hello!")).executeAsync
      //val task = Task.fork(Task.eval("Hello!"))
      // The default scheduler
      import monix.execution.Scheduler.Implicits.global
      // Creating a special scheduler meant for I/O
      import monix.execution.Scheduler
      lazy val io = Scheduler.io(name="my-io")
      //Then we can manage what executes on which:
      // Override the default Scheduler by fork:
      val source = Task(println(s"Running on thread: ${Thread.currentThread.getName}"))
      val forked = source.executeOn(io,true)
      // val forked = Task.fork(source, io)
      //=> Running on thread: ForkJoinPool-1-worker-1
      //=> Running on thread: my-io-4
      /* --------taskError ------- */
      import scala.concurrent.TimeoutException
      val taskError = Task.raiseError[Int](new TimeoutException)
      // error: monix.eval.Task[Int] =
      //   Delay(Error(java.util.concurrent.TimeoutException))
      taskError.runOnComplete(result => println(result))
      //=> Failure(java.util.concurrent.TimeoutException)


      final def doOnFinish(f: Option[Throwable] => Task[Unit]): Task[A] =
      final def doOnCancel(callback: Task[Unit]): Task[A] =
      final def onCancelRaiseError(e: Throwable): Task[A] =
      final def onErrorRecoverWith[B >: A](pf: PartialFunction[Throwable, Task[B]]): Task[B] =
      final def onErrorHandleWith[B >: A](f: Throwable => Task[B]): Task[B] =
      final def onErrorFallbackTo[B >: A](that: Task[B]): Task[B] =
      final def restartUntil(p: (A) => Boolean): Task[A] =
      final def onErrorRestart(maxRetries: Long): Task[A] =
      final def onErrorRestartIf(p: Throwable => Boolean): Task[A] =
      final def onErrorRestartLoop[S, B >: A](initial: S)(f: (Throwable, S, S => Task[B]) => Task[B]): Task[B] =
      final def onErrorHandle[U >: A](f: Throwable => U): Task[U] =
      final def onErrorRecover[U >: A](pf: PartialFunction[Throwable, U]): Task[U] =


      def runAsync(implicit s: Scheduler): CancelableFuture[A] =
      def runAsync(cb: Callback[A])(implicit s: Scheduler): Cancelable =
      def runAsyncOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] =
      def runAsyncOpt(cb: Callback[A])(implicit s: Scheduler, opts: Options): Cancelable =
      final def runSyncMaybe(implicit s: Scheduler): Either[CancelableFuture[A], A] =
      final def runSyncMaybeOpt(implicit s: Scheduler, opts: Options): Either[CancelableFuture[A], A] = 
      final def runSyncUnsafe(timeout: Duration)
        (implicit s: Scheduler, permit: CanBlock): A =
      final def runSyncUnsafeOpt(timeout: Duration)
        (implicit s: Scheduler, opts: Options, permit: CanBlock): A =
      final def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler): Cancelable =


      val task1 = Task {println("sum:"); 1+2}.delayExecution(1 second)
      println(task1.runSyncUnsafe(2 seconds))
      task1.runOnComplete {
        case Success(r) => println(s"result: $r")
        case Failure(e) => println(e.getMessage)


