zoukankan      html  css  js  c++  java
  • FunDA(6)- Reactive Streams:Play with Iteratees、Enumerator and Enumeratees

        在上一节我们介绍了Iteratee。它的功能是消耗从一些数据源推送过来的数据元素,不同的数据消耗方式代表了不同功能的Iteratee。所谓的数据源就是我们这节要讨论的Enumerator。Enumerator是一种数据源:它会根据下游数据消耗方(Iteratee)的具体状态主动向下推送数据元素。我们已经讨论过Iteratee的状态Step类型:

    trait Step[E,+A]
    case class Done[+A,E](a: A, remain: Input[E]) extends Step[E,A]
    case class Cont[E,+A](k: Input[E] => InputStreamHandler[E,A]) extends Step[E,A]
    case class Error[E](msg: String, loc:Input[E]) extends Step[E,Nothing]

    这其中Iteratee通过Cont状态通知Enumerator可以发送数据元素,并提供了k函数作为Enumerator的数据推送函数。Enumerator推送的数据元素,也就是Iteratee的输入Input[E],除单纯数据元素之外还代表着数据源状态: 

    trait Input[+E]
    case class EL[E](e: E) extends Input[E]
    case object EOF extends Input[Nothing]
    case object Empty extends Input[Nothing]

    Enumerator通过Input[E]来通知Iteratee当前数据源状态,如:是否已经完成所有数据推送(EOF),或者当前推送了什么数据元素(El[E](e:E))。Enumerator主动向Iteratee输出数据然后返回新状态的Iteratee。我们可以从Enumerator的类型款式看得出:

    trait Enumerator[E] {
    
      /**
       * Apply this Enumerator to an Iteratee
       */
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]
    
    }

    这个Future的目的主要是为了避免占用线程。实际上我们可以最终通过调用Iteratee的fold函数来实现Enumerator功能,如:

     /**
       * Creates an enumerator which produces the one supplied
       * input and nothing else. This enumerator will NOT
       * automatically produce Input.EOF after the given input.
       */
      def enumInput[E](e: Input[E]) = new Enumerator[E] {
        def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
          i.fold {
            case Step.Cont(k) => eagerFuture(k(e))
            case _ => Future.successful(i)
          }(dec)
      }

    又或者通过构建器(constructor, apply)来构建Eumerator:

    /**
       * Create an Enumerator from a set of values
       *
       * Example:
       * {{{
       *   val enumerator: Enumerator[String] = Enumerator("kiki", "foo", "bar")
       * }}}
       */
      def apply[E](in: E*): Enumerator[E] = in.length match {
        case 0 => Enumerator.empty
        case 1 => new Enumerator[E] {
          def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC {
            case Step.Cont(k) => k(Input.El(in.head))
            case _ => i
          }
        }
        case _ => new Enumerator[E] {
          def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i)
        }
      }
    
      /**
       * Create an Enumerator from any TraversableOnce like collection of elements.
       *
       * Example of an iterator of lines of a file :
       * {{{
       *  val enumerator: Enumerator[String] = Enumerator( scala.io.Source.fromFile("myfile.txt").getLines )
       * }}}
       */
      def enumerate[E](traversable: TraversableOnce[E])(implicit ctx: scala.concurrent.ExecutionContext): Enumerator[E] = {
        val it = traversable.toIterator
        Enumerator.unfoldM[scala.collection.Iterator[E], E](it: scala.collection.Iterator[E])({ currentIt =>
          if (currentIt.hasNext)
            Future[Option[(scala.collection.Iterator[E], E)]]({
              val next = currentIt.next
              Some((currentIt -> next))
            })(ctx)
          else
            Future.successful[Option[(scala.collection.Iterator[E], E)]]({
              None
            })
        })(dec)
      }
    
      /**
       * An empty enumerator
       */
      def empty[E]: Enumerator[E] = new Enumerator[E] {
        def apply[A](i: Iteratee[E, A]) = Future.successful(i)
      }
    
      private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) =>
        l.foldLeft(Future.successful(i))((i, e) =>
          i.flatMap(it => it.pureFold {
            case Step.Cont(k) => k(Input.El(e))
            case _ => it
          }(dec))(dec))
      }

    下面是个直接构建Enumerator的例子: 

     val enumUsers: Enumerator[String] = {
       Enumerator("Tiger","Hover","Grand","John")    
           //> enumUsers  : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@2ef9b8bc

    在这个例子里的Enumerator就是用上面那个apply构建的。我们把enumUsers连接到costume Iteratee:

     val consume = Iteratee.consume[String]()        //> consume  : play.api.libs.iteratee.Iteratee[String,String] = Cont(<function1>)
     val consumeUsers = enumUsers.apply(consume)      //> consumeUsers  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,String]] = Success(play.api.libs.iteratee.FutureIteratee@1dfe2924)

    我们是用apply(consume)来连接Enumerator和Iteratees的。apply函数的定义如下:

    /**
       * Attaches this Enumerator to an [[play.api.libs.iteratee.Iteratee]], driving the
       * Iteratee to (asynchronously) consume the input. The Iteratee may enter its
       * [[play.api.libs.iteratee.Done]] or [[play.api.libs.iteratee.Error]]
       * state, or it may be left in a [[play.api.libs.iteratee.Cont]] state (allowing it
       * to consume more input after that sent by the enumerator).
       *
       * If the Iteratee reaches a [[play.api.libs.iteratee.Done]] state, it will
       * contain a computed result and the remaining (unconsumed) input.
       */
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]

    这是个抽象函数。举个例实现这个apply函数的例子:

    /**
       * Creates an enumerator which produces the one supplied
       * input and nothing else. This enumerator will NOT
       * automatically produce Input.EOF after the given input.
       */
      def enumInput[E](e: Input[E]) = new Enumerator[E] {
        def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
          i.fold {
            case Step.Cont(k) => eagerFuture(k(e))
            case _ => Future.successful(i)
          }(dec)
      }

    consumeUsers: Future[Iteratee[String,String]],我们用Future的函数来显示发送数据内容:

     val futPrint = consumeUsers.flatMap { i => i.run }.map(println)
        //> futPrint  : scala.concurrent.Future[Unit] = List()
     Await.ready(futPrint,Duration.Inf)     //> TigerHoverGrandJohn res0: demo.worksheet.enumerator.futPrint.type = Success(()) 

    另一种更直接的方式:

    val futUsers = Iteratee.flatten(consumeUsers).run.map(println)
          //> futUsers  : scala.concurrent.Future[Unit] = List()
     Await.ready(futPrint,Duration.Inf)               
          //> TigerHoverGrandJohnres1: demo.worksheet.enumerator.futPrint.type = Success(())

    我们也可以使用函数符号 |>> :

     val futPrintUsers = {
      Iteratee.flatten(enumUsers |>> consume).run.map(println)
         //> futPrintUsers  : scala.concurrent.Future[Unit] = List()
     }
     Await.ready(futPrintUsers,Duration.Inf)          
         //> TigerHoverGrandJohn res2: demo.worksheet.enumerator.futPrintUsers.type = Success(())

    我们还可以把两个Enumerator串联起来向一个Iteratee发送数据:

     val futEnums = {
       Iteratee.flatten {
         (enumUsers >>> enumColors) |>> consume
       }.run.map(println)                       //> futEnums  : scala.concurrent.Future[Unit] = List()
     }
      Await.ready(futEnums,Duration.Inf)              
          //> TigerHoverGrandJohnRedWhiteBlueYellow res3: demo.worksheet.enumerator.futEnums.type = Success(())

    当然,最实用的应该是把InputStream的数据推送给一个Iteratee,如把一个文件内容发送给Iteratee:

    /**
       * Create an enumerator from the given input stream.
       *
       * Note that this enumerator will block when it reads from the file.
       *
       * @param file The file to create the enumerator from.
       * @param chunkSize The size of chunks to read from the file.
       */
      def fromFile(file: java.io.File, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
        fromStream(new java.io.FileInputStream(file), chunkSize)(ec)
      }
    
    /**
       * Create an enumerator from the given input stream.
       *
       * This enumerator will block on reading the input stream, in the supplied ExecutionContext.  Care must therefore
       * be taken to ensure that this isn't a slow stream.  If using this with slow input streams, make sure the
       * ExecutionContext is appropriately configured to handle the blocking.
       *
       * @param input The input stream
       * @param chunkSize The size of chunks to read from the stream.
       * @param ec The ExecutionContext to execute blocking code.
       */
      def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
        implicit val pec = ec.prepare()
        generateM({
          val buffer = new Array[Byte](chunkSize)
          val bytesRead = blocking { input.read(buffer) }
          val chunk = bytesRead match {
            case -1 => None
            case `chunkSize` => Some(buffer)
            case read =>
              val input = new Array[Byte](read)
              System.arraycopy(buffer, 0, input, 0, read)
              Some(input)
          }
          Future.successful(chunk)
        })(pec).onDoneEnumerating(input.close)(pec)
      }

    这项功能的核心函数是这个generateM,它的函数款式如下:

    /**
       * Like [[play.api.libs.iteratee.Enumerator.repeatM]], but the callback returns an Option, which allows the stream
       * to be eventually terminated by returning None.
       *
       * @param e The input function.  Returns a future eventually redeemed with Some value if there is input to pass, or a
       *          future eventually redeemed with None if the end of the stream has been reached.
       */
      def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] {
        private val pec = ec.prepare()
    
        def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap {
          case Some(e) => loop(k(Input.El(e)))
          case None => Future.successful(Cont(k))
        }(dec)
      })

    checkContinue0函数是这样定义的:

    trait TreatCont0[E] {
    
        def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]]
    
      }
    
      def checkContinue0[E](inner: TreatCont0[E]) = new Enumerator[E] {
    
        def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
    
          def step(it: Iteratee[E, A]): Future[Iteratee[E, A]] = it.fold {
            case Step.Done(a, e) => Future.successful(Done(a, e))
            case Step.Cont(k) => inner[A](step, k)
            case Step.Error(msg, e) => Future.successful(Error(msg, e))
          }(dec)
    
          step(it)
        }
      }

    从这段代码 case Step.Cont(k)=>inner[A](step, k)可以推断操作模式应该是当下游Iteratee在Cont状态下不断递归式调用Cont函数k向下推送数据e。我们再仔细看看generateM的函数款式;

     def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] 

    实际上刚才的操作就是重复调用这个e:=>Future[Option[E]]函数。再分析fromStream代码:

      def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
        implicit val pec = ec.prepare()
        generateM({
          val buffer = new Array[Byte](chunkSize)
          val bytesRead = blocking { input.read(buffer) }
          val chunk = bytesRead match {
            case -1 => None
            case `chunkSize` => Some(buffer)
            case read =>
              val input = new Array[Byte](read)
              System.arraycopy(buffer, 0, input, 0, read)
              Some(input)
          }
          Future.successful(chunk)
        })(pec).onDoneEnumerating(input.close)(pec)
      }

    我们看到传入generateM的参数是一段代码,在Iteratee状态为Cont时会不断重复运行,也就是说这段代码会逐次从输入源中读取chunkSize个Byte。这种做法是典型的Streaming方式,避免了一次性上载所有数据。下面是一个文件读取Enumerator例子:

     import java.io._
     val fileEnum: Enumerator[Array[Byte]] = {
      Enumerator.fromFile(new File("/users/tiger/lines.txt"))
     }
     val futFile = Iteratee.flatten { fileEnum |>> consume }.run.map(println)

    注意:fileEnum |>> consume并不能通过编译,这是因为fileEnum是个Enumerator[Array[Byte]],而consume是个Iteratee[String,String],Array[Byte]与String类型不符。我们可以用个Enumeratee来进行相关的转换。下面就介绍一下Enumeratee的功能。

    Enumeratee其实是一种转换器。它把Enumerator产生的数据转换成能适配Iteratee的数据类型,或者Iteratee所需要的数据。比如我们想把一串字符类的数字汇总相加时,首先必须把字符转换成数字类型才能进行Iteratee的汇总操作:

    val strNums = Enumerator("1","2","3")            //> strNums  : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@36b4cef0
     val sumIteratee: Iteratee[Int,Int] = Iteratee.fold(0)((s,i) => s+i)
                                                     //> sumIteratee  : play.api.libs.iteratee.Iteratee[Int,Int] = Cont(<function1>)
     
     val strToInt: Enumeratee[String,Int] = Enumeratee.map {s => s.toInt}
                                                     //> strToInt  : play.api.libs.iteratee.Enumeratee[String,Int] = play.api.libs.iteratee.Enumeratee$$anon$38$$anon$1@371a67ec
     strNums |>> strToInt.transform(sumIteratee)     //> res4: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
     strNums |>> strToInt &>> sumIteratee            //> res5: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
     strNums.through(strToInt) |>> sumIteratee       //> res6: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] = List()
     val futsum = Iteratee.flatten(strNums &> strToInt |>> sumIteratee).run.map(println)
                                                    //> futsum  : scala.concurrent.Future[Unit] = List()
     Await.ready(futsum,Duration.Inf)               //> 6
                                                    //| res7: demo.worksheet.enumerator.futsum.type = Success(())

    在上面这个例子里Enumerator数据元素是String, Iteratee操作数据类型是Int, strToInt是个把String转换成Int的Enumeratee,我们用了几种转换方式的表达形式,结果都是一样的,等于6。我们可以用Enumerator.through或者Enumeratee.transform来连接Enumerator与Iteratee。当然,我们也可以筛选输入Iteratee的数据:

    val sum2 = strNums &> Enumeratee.take(2) &> strToInt |>> sumIteratee
                     //> sum2  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] =List()
     val futsum2 = Iteratee.flatten(sum2).run.map(println)
                                                      //> futsum2  : scala.concurrent.Future[Unit] = List()
     Await.ready(futsum2,Duration.Inf)                //> 3
                                                      //| res8: demo.worksheet.enumerator.futsum2.type = Success(())

    上面例子里的Enumeratee.take(2)就是一个数据处理的Enumeratee。

    现在Enumerator+Enumeratee+Iteratee从功能上越来越像fs2了,当然了,Iteratee就是一个流工具库。我们已经选择了fs2,因为它可以支持灵活的并行运算,所以再深入讨论Iteratee就没什么意义了。

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    出乎意料的else语句
    2015年全国谷歌卫星地图离线数据存储方案
    Convert Sorted List to Binary Search Tree
    程序猿学习资料分享---爱分享的程序猿(新浪微博)
    myeclipse中断点调试
    J2EE之13个规范标准概念
    ora-01036: 非法的变量名/编号
    CCNA Cloud CLDFND 210-451 QUIZ: Server Virtualization
    已知二叉树的先序遍历序列和中序遍历序列,输出该二叉树的后序遍历序列
    UML类图中的几种关系总结
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/6361037.html
Copyright © 2011-2022 走看看