zoukankan      html  css  js  c++  java
  • 【Akka】在并发程序中使用Future

    引言

    在Akka中, 一个Future是用来获取某个并发操作的结果的数据结构。这个操作一般是由Actor运行或由Dispatcher直接运行的. 这个结果能够以同步(堵塞)或异步(非堵塞)的方式訪问。
    Future提供了一种简单的方式来运行并行算法。

    Future直接使用

    Future中的一个常见用例是在不须要使用Actor的情况下并发地运行计算。


    Future有两种使用方式:

    1. 堵塞方式(Blocking):该方式下,父actor或主程序停止运行知道全部future完毕各自任务。通过scala.concurrent.Await使用。

    2. 非堵塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在运行期间启动future,future任务和父actor并行运行,当每一个future完毕任务,将通知父actor。通过onCompleteonSuccessonFailure方式使用。

    运行上下文(ExecutionContext)

    为了运行回调和操作,Futures须要有一个ExecutionContext


    假设你在作用域内有一个ActorSystem。它会它自己派发器用作ExecutionContext,你也能够用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹。或者甚至创建自己的实例。
    通过导入ExecutionContext.Implicits.global来导入默认的全局运行上下文。

    你能够把该运行上下文看做是一个线程池,ExecutionContext是在某个线程池运行任务的抽象。


    假设在代码中没有导入该运行上下文,代码将无法编译。

    堵塞方式

    第一个样例展示怎样创建一个future,然后通过堵塞方式等待其计算结果。尽管堵塞方式不是一个非常好的使用方法,可是能够说明问题。


    这个样例中。通过在未来某个时间计算1+1,当计算结果后再返回。

    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    object FutureBlockDemo extends App{
      implicit val baseTime = System.currentTimeMillis
    
      // create a Future
      val f = Future {
        Thread.sleep(500)
        1+1
      }
      // this is blocking(blocking is bad)
      val result = Await.result(f, 1 second)
      // 假设Future没有在Await规定的时间里返回,
      // 将抛出java.util.concurrent.TimeoutException
      println(result)
      Thread.sleep(1000)
    }

    代码解释:

    1. 在上面的代码中。被传递给Future的代码块会被缺省的Dispatcher所运行。代码块的返回结果会被用来完毕Future。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。
    2. Await.result方法将堵塞1秒时间来等待Future结果返回。假设Future在规定时间内没有返回,将抛出java.util.concurrent.TimeoutException异常。
    3. 通过导入scala.concurrent.duration._,能够用一种方便的方式来声明时间间隔,如100 nanos500 millis5 seconds1 minute1 hour3 days

      还能够通过Duration(100, MILLISECONDS)Duration(200, "millis")来创建时间间隔。

    非堵塞方式(回调方式)

    有时你只须要监听Future的完毕事件,对其进行响应,不是创建新的Future,而不过产生副作用。


    通过onComplete,onSuccess,onFailure三个回调函数来异步运行Future任务,而后两者不过第一项的特例。

    使用onComplete的代码演示样例:

    import scala.concurrent.{Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import scala.util.Random
    
    object FutureNotBlock extends App{
      println("starting calculation ...")
      val f = Future {
        Thread.sleep(Random.nextInt(500))
        42
      }
    
      println("before onComplete")
      f.onComplete{
        case Success(value) => println(s"Got the callback, meaning = $value")
        case Failure(e) => e.printStackTrace
      }
    
      // do the rest of your work
      println("A ...")
      Thread.sleep(100)
      println("B ....")
      Thread.sleep(100)
      println("C ....")
      Thread.sleep(100)
      println("D ....")
      Thread.sleep(100)
      println("E ....")
      Thread.sleep(100)
    
      Thread.sleep(2000)
    }

    使用onSuccess、onFailure的代码演示样例:

    import scala.concurrent.{Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import scala.util.Random
    
    object Test12_FutureOnSuccessAndFailure extends App{
      val f = Future {
        Thread.sleep(Random.nextInt(500))
        if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
      }
    
      f onSuccess {
        case result => println(s"Success: $result")
      }
    
      f onFailure {
        case t => println(s"Exception: ${t.getMessage}")
      }
    
      // do the rest of your work
      println("A ...")
      Thread.sleep(100)
      println("B ....")
      Thread.sleep(100)
      println("C ....")
      Thread.sleep(100)
      println("D ....")
      Thread.sleep(100)
      println("E ....")
      Thread.sleep(100)
    
      Thread.sleep(1000)
    }

    代码解释:
    上面两段样例中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。
    然后在回调函数中进行相关处理。

    创建返回Future[T]的方法

    先看一下演示样例:

    import scala.concurrent.{Await, Future, future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    
    object ReturnFuture extends App{
      implicit val baseTime = System.currentTimeMillis
    
      // `future` method is another way to create a future
      // It starts the computation asynchronously and retures a Future[Int] that
      // will hold the result of the computation.
      def longRunningComputation(i: Int): Future[Int] = future {
        Thread.sleep(100)
        i + 1
      }
    
      // this does not block
      longRunningComputation(11).onComplete {
        case Success(result) => println(s"result = $result")
        case Failure(e) => e.printStackTrace
      }
    
      // keep the jvm from shutting down
      Thread.sleep(1000)
    }

    代码解释:
    上面代码中的longRunningComputation返回一个Future[Int],然后进行相关的异步操作。
    当中future方法是创建一个future的还有一种方法。它将启动一个异步计算而且返回包括计算结果的Future[T]

    Future用于Actor

    通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息actor ! msg。这样的方法只在发送者是一个Actor时有效;另外一种是通过一个Future。


    使用Actor的?方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是:

    import scala.concurrent.Await
    import akka.pattern.ask
    import scala.concurrent.duration._
    import akka.util.Timeout
    
    implicit val timeout = Timeout(5 seconds)
    val future = actor ? msg
    val result = Await.result(future, timeout.duration).asInstanceOf[String]

    以下是使用?发送消息给actor,并等待回应的代码演示样例:

    import akka.actor._
    import akka.pattern.ask
    import akka.util.Timeout
    import scala.concurrent.{Await, Future}
    import scala.language.postfixOps
    import scala.concurrent.duration._
    
    case object AskNameMessage
    
    class TestActor extends Actor {
      def receive = {
        case AskNameMessage => // respond to the 'ask' request
          sender ! "Fred"
        case _ => println("that was unexpected")
      }
    }
    object AskDemo extends App{
      //create the system and actor
      val system = ActorSystem("AskDemoSystem")
      val myActor = system.actorOf(Props[TestActor], name="myActor")
    
      // (1) this is one way to "ask" another actor for information
      implicit val timeout = Timeout(5 seconds)
      val future = myActor ? AskNameMessage
      val result = Await.result(future, timeout.duration).asInstanceOf[String]
      println(result)
    
      // (2) a slightly different way to ask another actor for information
      val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
      val result2 = Await.result(future2, 1 second)
      println(result2)
    
      system.shutdown
    }

    代码解释:

    1. Await.result(future, timeout.duration).asInstanceOf[String]会导致当前线程被堵塞,并等待actor通过它的应答来完毕Future

      可是堵塞会导致性能问题。所以是不推荐的。

      致堵塞的操作位于Await.resultAwait.ready中,这样就方便定位堵塞的位置。

    2. 还要注意actor返回的Future的类型是Future[Any],这是由于actor是动态的。 这也是为什么上例中凝视(1)使用了asInstanceOf
    3. 在使用非堵塞方式时,最好使用mapTo方法来将Future转换到期望的类型。假设转换成功。mapTo方法会返回一个包括结果的新的 Future。假设不成功,则返回ClassCastException异常。

    转载请注明作者Jason Ding及其出处
    Github博客主页(http://jasonding1354.github.io/)
    GitCafe博客主页(http://jasonding1354.gitcafe.io/)
    CSDN博客(http://blog.csdn.net/jasonding1354)
    简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
    Google搜索jasonding1354进入我的博客主页

  • 相关阅读:
    Java8时间处理
    yii2.0上传图片
    将字符串不足长度的填充到指定长度
    通过PHPExcel将Excel表文件中数据导入数据库
    css万能清除原理
    浮动+清除浮动
    DIV滚动条设置添加 CSS滚动条显示与滚动条隐藏
    地图上显示点在点上标注当前点的id
    百度地图点击地址后显示图标,保存到数据库之后,页面显示的是保存的坐标图标
    百度地图API
  • 原文地址:https://www.cnblogs.com/llguanli/p/8674912.html
Copyright © 2011-2022 走看看