zoukankan      html  css  js  c++  java
  • 从List[Future[T]]到Future[List[T]]

    在课程<Principles Of Reactive Programming>里Week3的一节 "Promises, promises, promises"中,Erik Meijer举了一个例子,实现一个函数:

    def sequence[T](fs: List[Future[T]]): Future[List[T]] = {.....}

    这个函数实际在Scala library的Future对象中有标准的实现。

    def  sequence[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]]

     

    Simple version of Future.traverse. Transforms a TraversableOnce[Future[A]] into a Future[TraversableOnce[A]]. Useful for reducing many Futures into a single Future.

    俺就想试着自己实现一下,于是写出了下面这段有问题的代码……

    object ConcurrentTool {
      def collect[T](futures: List[Future[T]]): Future[List[T]] = {
        val p = Promise[List[T]]
        var list = List.empty[T]
        futures.foldLeft(list) {
          (curr, future) => {
            if (p.isCompleted)
              curr
            else {
              future.onComplete {
                case Failure(e) => p.failure(e)
                case Success(e) => list = e :: list
              }
              list
            }
          }
        }
        if (!p.isCompleted)
          p.success(list)
        p.future
      }
    
    def main(args: Array[String]) = {
        def futureOne = {
          Future {
            1
          }
        }
        def futureTwo = {
          Future {
            2
          }
        }
    
        val collection = collect(List(futureOne, futureTwo))
        val lists = Await.result(collection, 1 seconds)
        println(lists.size)
    
      }
    }

    collect方法用于将一个List[Future[T]]变成一个Future[List[T]]。然后我传给它两个future,等返回的Future[List[T]] compelete,然后取结果List的大小,打印出来。

    但是结果是0……但是,在我加了断点进行调试时,有时结果是2,为啥呢? 错误不可怕,这是纠正自己的机会。

    再来看一下collect方法的实现吧。我用一个Promise生成最后的future, 用一个空List做为foldLeft的初始值,然后遍历List[Future]里的所有future。对于每个future,给它注册一个回调函数,当它fail的时候,用引起fail的异常去complete跟最后结果相关的那个Promise,如果这个future成功了,就把它的结果附加在list里。在注册完回调之后,我返回保存结果的list。

    问题就在这些操作的执行时间上。给future注册回调函数的动作是在main线程中,这个注册不会阻塞main线程的执行,假如被注册的函数的确是在另一个线程中执行的,那么在注册完回调函数之后,我返回的list仍然可能是最初的那个empty list, 所以在foldLeft完成后,foldLeft返回的仍然可能是那个empty list。接下来,我判断p.isCompleted, 如果否,我就用这个list去complete这个Promise,实际上我用一个empty list去complete了它,所以在获取collection的结果后,发现这个是一个空列表。

    那么我们想要的结果如何实现呢?关键是,必须得生成这个Future[List]的所有future都compelete时,这个Future[List]才能complete。如何实现这一点呢?

    • 我们可以阻塞执行collect方法的线程,显式地等待List[Future]里的所有future完成,再complete跟结果相关的那个Promise。但是这样做,会阻塞调用collect的线程,也违背了我们返回一个Future的目的。
    • 我们可以在另一个线程里等待List[Future]里的所有future完成,再complete跟结果相关的那个Promise。在collect方法中,返回Promise对应的future。这样就不会阻塞调用collect的线程。

    如何在另一个线程等待呢?可以用Await来阻塞等待,或者注册callback,使得当所有future完成时,callback被调用。

    假如有两个Future,可以用下边的代码注册callback。

     def waitBoth[T](futureA: Future[T], futureB: Future[T]): Future[List[T]] = {
        val p = Promise[List[T]]()
        futureA.onComplete{
          case Failure(e) => p.failure(e)
          case Success(t) => futureB.onComplete{
            case Failure(eb) => p.failure(eb)
            case Success(b) => p.success(t :: b :: Nil)
          }
        }
        p.future
      }

    这个变形一下,用flatMap和map表示就是

     def waitBoth[T](futureA: Future[T], futureB: Future[T]): Future[List[T]] = {
        val p = Promise[List[T]]()
        futureA.onComplete{
          case Failure(e) => p.failure(e)
          case Success(t) => futureA.flatMap{a: T =>
            futureB map {b =>
              p.success(a :: b :: Nil)
            }
          }
        }
        p.future
      }

    实际上Future的map和flatMap在实现时也用了Promise, 上边的代码简化一下就是

      def waitBoth[T](futureA: Future[T], futureB: Future[T]): Future[List[T]] = {
        futureA.flatMap { a: T =>
          futureB map { b =>
            a :: b :: Nil
          }
        }
      }

    再把flatMap和map转成for循环表示,就是

      def waitTwo[T](futureA: Future[T], futureB: Future[T]): Future[List[T]] = {
        for {
          a <- futureA
          b <- futureB
        } yield a :: b :: Nil
      }

    那么如何组合更多的Future呢?我们来写一个方法把一个Future[T]的结果附加到Future[List[T]]中

      def waitMore[T](futureA: Future[T], futures: Future[List[T]]): Future[List[T]] = {
        for{
          a <- futureA
          b <- futures
        }yield a :: b
      }

    然后以此为基础,就可以构造最早提到的sequence函数

      def waitSome[T](futures: List[Future[T]]): Future[List[T]] = {
        val p = Promise[List[T]]()
        p.success(Nil)
        val init: Future[List[T]] = p.future
        futures.foldLeft(init){
          (curr, f) => waitMore(f, curr)
        }
      }

     上边代码关键在于了解到我们需要一个Future作为foldLeft的初始值,它必须是success的,且使其success的值为Nil。这个init实际上也可以用 val init: Future[List[T]] = Future{Nil}来得到。

    这种形式距离<Principles Of Reactive Programming>给出的答案已经很接近了。实际上Erik Meijer给出了两个解法,其中跟这个相近的是

      def sequence[T](fs: List[Future[T]]): Future[List[T]] = {
        val successful = Promise[List[T]]
        successful.success(Nil)
        fs.foldRight(successful.future){
          (f, acc) => for{x <-f; xs <- acc} yield x :: xs
        }
      }
  • 相关阅读:
    处理sevenzipsharp 检查密码函数的Bug
    C# 开源压缩组件比较
    css 一些技巧
    input 限制输入
    原生JS实现淡入淡出效果(fadeIn/fadeOut/fadeTo)
    js string.format 方法
    Atom插件及使用
    chrome浏览器的跨域设置-包括版本49前后两种设置 ,windows&mac
    原生js监听input值改变事件
    html5 tab横向滚动,无滚动条(transform:translate)
  • 原文地址:https://www.cnblogs.com/devos/p/4120929.html
Copyright © 2011-2022 走看看