zoukankan      html  css  js  c++  java
  • Scalaz(44)- concurrency :scalaz Future,尚不完整的多线程类型

    scala已经配备了自身的Future类。我们先举个例子来了解scala Future的具体操作: 

     1 import scala.concurrent._
     2 import ExecutionContext.Implicits.global
     3 object scalafuture {
     4   def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000) ; i + i }
     5                                       //> dbl: (i: Int)scala.concurrent.Future[Int]
     6   val fdbl = dbl(3)                   //> fdbl  : scala.concurrent.Future[Int] = List()
     7   fdbl.onSuccess {
     8     case a => println(s"${a/2} + ${a/2} = $a")
     9   }
    10   println("calculating ...")          //> calculating ...
    11   Thread.sleep(2000)                  //> 3 + 3 = 6
    12 }

     这是一个标准的异步运算;在成功完成运算事件上绑定callback来获取在其它线程中的运算结果。我们也可以进行异常处理:

    1   val fdz = Future { 3 / 0 }      //> fdz  : scala.concurrent.Future[Int] = List()
    2   fdz.onFailure {
    3     case e => println(s"error message {${e.getMessage}}")
    4   }
    5   Thread.sleep(100)               //> error message {/ by zero}

    又或者同时绑定运算成功和失败事件的callback函数:

    1   import scala.util.{Success, Failure}
    2   fdz onComplete {
    3     case Success(a) => println(s"${a/2} + ${a/2} = $a")
    4     case Failure(e) => println(s"error message {${e.getMessage}}")
    5   }
    6   Thread.sleep(100)               //> error message {/ by zero}

     scala Future 实现了flatMap,我们可以把几个Future组合起来用:

     1   def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000); i + i }
     2                                                   //> dbl: (i: Int)scala.concurrent.Future[Int]
     3   def sqr(i: Int): Future[Int] = Future { i * i } //> sqr: (i: Int)scala.concurrent.Future[Int]
     4   def sum(a: Int, b: Int): Future[Int] = Future { a + b }
     5                                           //> sum: (a: Int, b: Int)scala.concurrent.Future[Int]
     6   val fsum = for {
     7     a <- dbl(3)
     8     b <- sqr(a)
     9     c <- sum(a,b)
    10   } yield c                               //> fsum  : scala.concurrent.Future[Int] = List()
    11   
    12   fsum onSuccess { case c => println(s"the combined result is: $c") }
    13   Thread.sleep(2000)                     //> the combined result is: 42

    scala Future利用flatMap实现了流程运算:先运算dbl再sqr再sum,这个顺序是固定的即使它们可能在不同的线程里运算,因为sqr依赖dbl的结果,而sum又依赖dbl和sqr的结果。

    好了,既然scala Future的功能已经比较完善了,那么scalaz的Future又有什么不同的特点呢?首先,细心一点可以发现scala Future是即时运算的,从下面的例子里可以看出:

    1   import scala.concurrent.duration._
    2   val fs = Future {println("run now..."); System.currentTimeMillis() }
    3                                          //> run now...
    4                                          //| fs  : scala.concurrent.Future[Long] = List()
    5   Await.result(fs, 1.second)             //> res0: Long = 1465907784714
    6   Thread.sleep(1000)
    7   Await.result(fs, 1.second)             //> res1: Long = 1465907784714

    可以看到fs是在Future构建时即时运算的,而且只会运算一次。如果scala Future中包括了能产生副作用的代码,在构建时就会立即产生副作用。所以我们是无法使用scala Future来编写纯函数的,那么在scalaz里就必须为并发编程提供一个与scala Future具同等功能但又不会立即产生副作用的类型了,这就是scalaz版本的Future。我们看看scalaz是如何定义Future的:scalaz.concurrent/Future.scala

    sealed abstract class Future[+A] {
    ...
    object Future {
      case class Now[+A](a: A) extends Future[A]
      case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A]
      case class Suspend[+A](thunk: () => Future[A]) extends Future[A]
      case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B]
      case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit,
                                f: A => Future[B]) extends Future[B]
    ...

    Future[A]就是个Free Monad。它的结构化表达方式分别有Now,Async,Suspend,BindSuspend,BindAsync。我们可以用这些结构实现flatMap函数,所以Future就是Free Monad:

      def flatMap[B](f: A => Future[B]): Future[B] = this match {
        case Now(a) => Suspend(() => f(a))
        case Suspend(thunk) => BindSuspend(thunk, f)
        case Async(listen) => BindAsync(listen, f)
        case BindSuspend(thunk, g) =>
          Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f)))
        case BindAsync(listen, g) =>
          Suspend(() => BindAsync(listen, g andThen (_ flatMap f)))
      }

     free structure类型可以支持算式/算法关注分离,也就是说我们可以用scalaz Future来描述程序功能而不涉及正真运算。scalaz Future的构建方式如下:

     1 import scalaz._
     2 import Scalaz._
     3 import scalaz.concurrent._
     4 import scala.concurrent.duration._
     5 object scalazFuture {
     6 val fnow = Future.now {println("run..."); System.currentTimeMillis()}
     7                                           //> run...
     8                                           //| fnow  : scalaz.concurrent.Future[Long] = Now(1465909860301)
     9 val fdelay = Future.delay {println("run..."); System.currentTimeMillis()}
    10                                           //> fdelay  : scalaz.concurrent.Future[Long] = Suspend(<function0>)
    11 val fapply = Future {println("run..."); System.currentTimeMillis()}
    12                                           //> fapply  : scalaz.concurrent.Future[Long] = Async(<function1>)

    可以看到fnow是个即时运算的构建器,而这个now就是一个lift函数, 它负责把一个普通无副作用运算升格成Future。fdelay,fapply分别把运算存入trampoline进行结构化了。我们必须另外运算trampoline来运行结构内的运算:

     1 fdelay.run                                        //> run...
     2                                                   //| res0: Long = 1465910524847
     3 Thread.sleep(1000)
     4 fdelay.run                                        //> run...
     5                                                   //| res1: Long = 1465910525881
     6 fapply.run                                        //> run...
     7                                                   //| res2: Long = 1465910525883
     8 Thread.sleep(1000)
     9 fapply.run                                        //> run...
    10                                                   //| res3: Long = 1465910526884

    scalaz Future只有在运算时才会产生副作用,而且可以多次运算。

    我们可以用即时(blocking)、异步、定时方式来运算Future:

     1 fapply.unsafePerformSync                          //> run...
     2                                                   //| res4: Long = 1465958049118
     3 fapply.unsafePerformAsync {
     4   case a => println(a)
     5 }
     6 Thread.sleep(1000)
     7 fapply.unsafePerformSyncFor(1 second)             //> run...
     8                                                   //| 1465958051126
     9                                                   //| run...
    10                                                   //| res5: Long = 1465958052172

    结构化状态Async代表了scalaz Future的多线程处理特性:

    /**
       * Create a `Future` from an asynchronous computation, which takes the form
       * of a function with which we can register a callback. This can be used
       * to translate from a callback-based API to a straightforward monadic
       * version. See `Task.async` for a version that allows for asynchronous
       * exceptions.
       */
      def async[A](listen: (A => Unit) => Unit): Future[A] =
        Async((cb: A => Trampoline[Unit]) => listen { a => cb(a).run })
    
      /** Create a `Future` that will evaluate `a` using the given `ExecutorService`. */
      def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb =>
        pool.submit { new Callable[Unit] { def call = cb(a).run }}
      }
    
      /** Create a `Future` that will evaluate `a` after at least the given delay. */
      def schedule[A](a: => A, delay: Duration)(implicit pool: ScheduledExecutorService =
          Strategy.DefaultTimeoutScheduler): Future[A] =
        Async { cb =>
          pool.schedule(new Callable[Unit] {
            def call = cb(a).run
          }, delay.toMillis, TimeUnit.MILLISECONDS)
        }

    我们看到apply和schedule在构建Future时对运算线程进行了配置。

    如果我们需要模仿scala Future的功效可以用unsafeStart:

    1 val fs = fapply.unsafeStart              //> run...
    2                                          //| fs  : scalaz.concurrent.Future[Long] = Suspend(<function0>)
    3 fs.run                                   //> res6: Long = 1465958922401
    4 Thread.sleep(1000)
    5 fs.run                                   //> res7: Long = 1465958922401

    我们也可以用scala Future的callback方式用async函数把自定义的callback挂在构建的Future上:

    1 def fu(t: Long): Future[String] =
    2   Future.async[String]{k => k(s"the curreent time is: ${t.toString}!!!")}
    3                                                   //> fu: (t: Long)scalaz.concurrent.Future[String]
    4 fu(System.currentTimeMillis()).run                //> res8: String = the curreent time is: 1465958923415!!!

    scala Future和scalaz Future之间可以相互转换:

     1 import scala.concurrent.{Future => sFuture}
     2 import scala.concurrent.ExecutionContext
     3 import scala.util.{Success,Failure}
     4 def futureTozFuture[A](sf: sFuture[A])(implicit ec: ExecutionContext): Future[A] =
     5   Future.async {cb => sf.onComplete {
     6     case Success(a) => cb(a)
     7 //    case Failure(e) => cb(e)
     8   }}                            //> futureTozFuture: [A](sf: scala.concurrent.Future[A])(implicit ec: scala.con
     9                                 //| current.ExecutionContext)scalaz.concurrent.Future[A]
    10 def zFutureTosFuture[A](zf: Future[A]): sFuture[A] = {
    11   val prom = scala.concurrent.Promise[A]
    12   zf.unsafePerformAsync {
    13      case a => prom.success(a)是
    14   }
    15   prom.future
    16 }

    突然发现scalaz Future是没有异常处理(exception)功能的。scalaz提供了concurrent.Task类型填补了Future的这部分缺陷。我们会在下篇讨论Task。
    我们用上面scala Future的例子来示范scalaz Future的函数组合能力:

     1   def dbl(i: Int): Future[Int] = Future { i + i } //> dbl: (i: Int)scalaz.concurrent.Future[Int]
     2   def sqr(i: Int): Future[Int] = Future { i * i } //> sqr: (i: Int)scalaz.concurrent.Future[Int]
     3   def sum(a: Int, b: Int): Future[Int] = Future { a + b }
     4                                   //> sum: (a: Int, b: Int)scalaz.concurrent.Future[Int]
     5   val fsum = for {
     6     a <- dbl(3)
     7     b <- sqr(a)
     8     c <- sum(a,b)
     9   } yield c                       //> fsum  : scalaz.concurrent.Future[Int] = BindAsync(<function1>,<function1>)
    10 
    11   fsum.unsafePerformAsync {
    12     case a => println(s"result c is:$a")
    13   }
    14   Thread.sleep(1000)              //> result c is:42

     

     

     

     

     

     

     

  • 相关阅读:
    基于python+django+mysql的接口测试平台
    firefox没有装在C盘,webdriver启动firefox时报错
    Python知识点面试题
    Python面试题整理
    Python程序猿面试杂谈
    Python面试-websocket及web框架
    Python面试-DB相关
    Python面试简介及并行并发
    flink-demo2
    flink-table demo
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/5586834.html
Copyright © 2011-2022 走看看