zoukankan      html  css  js  c++  java
  • 【Akka】在并发程序中使用Future

    引言

    在Akka中, 一个Future是用来获取某个并发操作的结果的数据结构。这个操作一般是由Actor运行或由Dispatcher直接运行的. 这个结果能够以同步(堵塞)或异步(非堵塞)的方式訪问。
    Future提供了一种简单的方式来运行并行算法。

    Future直接使用

    Future中的一个常见用例是在不须要使用Actor的情况下并发地运行计算。


    Future有两种使用方式:

    1. 堵塞方式(Blocking):该方式下,父actor或主程序停止运行知道全部future完毕各自任务。通过scala.concurrent.Await使用。

    2. 非堵塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在运行期间启动future,future任务和父actor并行运行,当每一个future完毕任务,将通知父actor。通过onCompleteonSuccessonFailure方式使用。

    运行上下文(ExecutionContext)

    为了运行回调和操作,Futures须要有一个ExecutionContext


    假设你在作用域内有一个ActorSystem。它会它自己派发器用作ExecutionContext,你也能够用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹。或者甚至创建自己的实例。
    通过导入ExecutionContext.Implicits.global来导入默认的全局运行上下文。

    你能够把该运行上下文看做是一个线程池,ExecutionContext是在某个线程池运行任务的抽象。


    假设在代码中没有导入该运行上下文,代码将无法编译。

    堵塞方式

    第一个样例展示怎样创建一个future,然后通过堵塞方式等待其计算结果。尽管堵塞方式不是一个非常好的使用方法,可是能够说明问题。


    这个样例中。通过在未来某个时间计算1+1,当计算结果后再返回。

    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    object FutureBlockDemo extends App{
      implicit val baseTime = System.currentTimeMillis
    
      // create a Future
      val f = Future {
        Thread.sleep(500)
        1+1
      }
      // this is blocking(blocking is bad)
      val result = Await.result(f, 1 second)
      // 假设Future没有在Await规定的时间里返回,
      // 将抛出java.util.concurrent.TimeoutException
      println(result)
      Thread.sleep(1000)
    }

    代码解释:

    1. 在上面的代码中。被传递给Future的代码块会被缺省的Dispatcher所运行。代码块的返回结果会被用来完毕Future。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。
    2. Await.result方法将堵塞1秒时间来等待Future结果返回。假设Future在规定时间内没有返回,将抛出java.util.concurrent.TimeoutException异常。
    3. 通过导入scala.concurrent.duration._,能够用一种方便的方式来声明时间间隔,如100 nanos500 millis5 seconds1 minute1 hour3 days

      还能够通过Duration(100, MILLISECONDS)Duration(200, "millis")来创建时间间隔。

    非堵塞方式(回调方式)

    有时你只须要监听Future的完毕事件,对其进行响应,不是创建新的Future,而不过产生副作用。


    通过onComplete,onSuccess,onFailure三个回调函数来异步运行Future任务,而后两者不过第一项的特例。

    使用onComplete的代码演示样例:

    import scala.concurrent.{Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import scala.util.Random
    
    object FutureNotBlock extends App{
      println("starting calculation ...")
      val f = Future {
        Thread.sleep(Random.nextInt(500))
        42
      }
    
      println("before onComplete")
      f.onComplete{
        case Success(value) => println(s"Got the callback, meaning = $value")
        case Failure(e) => e.printStackTrace
      }
    
      // do the rest of your work
      println("A ...")
      Thread.sleep(100)
      println("B ....")
      Thread.sleep(100)
      println("C ....")
      Thread.sleep(100)
      println("D ....")
      Thread.sleep(100)
      println("E ....")
      Thread.sleep(100)
    
      Thread.sleep(2000)
    }

    使用onSuccess、onFailure的代码演示样例:

    import scala.concurrent.{Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import scala.util.Random
    
    object Test12_FutureOnSuccessAndFailure extends App{
      val f = Future {
        Thread.sleep(Random.nextInt(500))
        if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
      }
    
      f onSuccess {
        case result => println(s"Success: $result")
      }
    
      f onFailure {
        case t => println(s"Exception: ${t.getMessage}")
      }
    
      // do the rest of your work
      println("A ...")
      Thread.sleep(100)
      println("B ....")
      Thread.sleep(100)
      println("C ....")
      Thread.sleep(100)
      println("D ....")
      Thread.sleep(100)
      println("E ....")
      Thread.sleep(100)
    
      Thread.sleep(1000)
    }

    代码解释:
    上面两段样例中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。
    然后在回调函数中进行相关处理。

    创建返回Future[T]的方法

    先看一下演示样例:

    import scala.concurrent.{Await, Future, future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    
    object ReturnFuture extends App{
      implicit val baseTime = System.currentTimeMillis
    
      // `future` method is another way to create a future
      // It starts the computation asynchronously and retures a Future[Int] that
      // will hold the result of the computation.
      def longRunningComputation(i: Int): Future[Int] = future {
        Thread.sleep(100)
        i + 1
      }
    
      // this does not block
      longRunningComputation(11).onComplete {
        case Success(result) => println(s"result = $result")
        case Failure(e) => e.printStackTrace
      }
    
      // keep the jvm from shutting down
      Thread.sleep(1000)
    }

    代码解释:
    上面代码中的longRunningComputation返回一个Future[Int],然后进行相关的异步操作。
    当中future方法是创建一个future的还有一种方法。它将启动一个异步计算而且返回包括计算结果的Future[T]

    Future用于Actor

    通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息actor ! msg。这样的方法只在发送者是一个Actor时有效;另外一种是通过一个Future。


    使用Actor的?方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是:

    import scala.concurrent.Await
    import akka.pattern.ask
    import scala.concurrent.duration._
    import akka.util.Timeout
    
    implicit val timeout = Timeout(5 seconds)
    val future = actor ? msg
    val result = Await.result(future, timeout.duration).asInstanceOf[String]

    以下是使用?发送消息给actor,并等待回应的代码演示样例:

    import akka.actor._
    import akka.pattern.ask
    import akka.util.Timeout
    import scala.concurrent.{Await, Future}
    import scala.language.postfixOps
    import scala.concurrent.duration._
    
    case object AskNameMessage
    
    class TestActor extends Actor {
      def receive = {
        case AskNameMessage => // respond to the 'ask' request
          sender ! "Fred"
        case _ => println("that was unexpected")
      }
    }
    object AskDemo extends App{
      //create the system and actor
      val system = ActorSystem("AskDemoSystem")
      val myActor = system.actorOf(Props[TestActor], name="myActor")
    
      // (1) this is one way to "ask" another actor for information
      implicit val timeout = Timeout(5 seconds)
      val future = myActor ? AskNameMessage
      val result = Await.result(future, timeout.duration).asInstanceOf[String]
      println(result)
    
      // (2) a slightly different way to ask another actor for information
      val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
      val result2 = Await.result(future2, 1 second)
      println(result2)
    
      system.shutdown
    }

    代码解释:

    1. Await.result(future, timeout.duration).asInstanceOf[String]会导致当前线程被堵塞,并等待actor通过它的应答来完毕Future

      可是堵塞会导致性能问题。所以是不推荐的。

      致堵塞的操作位于Await.resultAwait.ready中,这样就方便定位堵塞的位置。

    2. 还要注意actor返回的Future的类型是Future[Any],这是由于actor是动态的。 这也是为什么上例中凝视(1)使用了asInstanceOf
    3. 在使用非堵塞方式时,最好使用mapTo方法来将Future转换到期望的类型。假设转换成功。mapTo方法会返回一个包括结果的新的 Future。假设不成功,则返回ClassCastException异常。

    转载请注明作者Jason Ding及其出处
    Github博客主页(http://jasonding1354.github.io/)
    GitCafe博客主页(http://jasonding1354.gitcafe.io/)
    CSDN博客(http://blog.csdn.net/jasonding1354)
    简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
    Google搜索jasonding1354进入我的博客主页

  • 相关阅读:
    LeetCode 623. Add One Row to Tree
    LeetCode 894. All Possible Full Binary Trees
    LeetCode 988. Smallest String Starting From Leaf
    LeetCode 979. Distribute Coins in Binary Tree
    LeetCode 814. Binary Tree Pruning
    LeetCode 951. Flip Equivalent Binary Trees
    LeetCode 426. Convert Binary Search Tree to Sorted Doubly Linked List
    LeetCode 889. Construct Binary Tree from Preorder and Postorder Traversal
    LeetCode 687. Longest Univalue Path
    LeetCode 428. Serialize and Deserialize N-ary Tree
  • 原文地址:https://www.cnblogs.com/llguanli/p/8674912.html
Copyright © 2011-2022 走看看