zoukankan      html  css  js  c++  java
  • FunDA(9)- Stream Source:reactive data streams

        上篇我们讨论了静态数据源(Static Source, snapshot)。这种方式只能在预知数据规模有限的情况下使用,对于超大型的数据库表也可以说是不安全的资源使用方式。Slick3.x已经增加了支持Reactive-Streams功能,可以通过Reactive-Streams API来实现有限内存空间内的无限规模数据读取,这正符合了FunDA的设计理念:高效、便捷、安全的后台数据处理工具库。我们在前面几篇讨论里介绍了Iteratee模式,play-iteratees支持Reactive-Streams并且提供与Slick3.x的接口API,我们就在这篇讨论里介绍如何把Slick-Reactive-Streams转换成fs2-Streams。根据Slick官方文档:Slick可以通过db.stream函数用Reactive-Stream方式来读取后台数据,具体的配置如下:

      val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
      val action = queryAction.withStatementParameters(fetchSize = 512)
      val publisher = db.stream(disableAutocommit andThen action)

    首先,我们需要取消自动提交(disableAutocommit)。fetchSize是缓存数据页长度(每批次读取数据字数),然后用db.stream来构成一个Reactive-Streams标准的数据源publisher。Slick官方网页只提供了下面这个使用publisher的例子:

      val fut = publisher.foreach(s => println(s))
      Await.ready(fut,Duration.Inf)

    除了数据枚举外就没什么用处,也无法提供更细节点的示范。FunDA的具体解决方案是把publisher转换成play-iteratee的Enumerator。play-iteratee支持Reactive-Streams,所以这个Enumerator应该具备协调后台数据和内存缓冲之间关系(back-pressure)的功能。play-iteratee是如下构建Enumerator的;

    import play.api.libs.iteratee._
    val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

    enumerator从后台数据库表中产生的数据源通过Iteratee把数据元素enqueue推送给一个fs2的queue:

        private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont {
          case Input.EOF => {
            q.enqueue1(None).unsafeRun
            Done((), Input.Empty)
          }
          case Input.Empty => pushData(q)
          case Input.El(e) => {
            q.enqueue1(Some(e)).unsafeRun
            pushData(q)
          }
        }

    然后fs2进行dequeue后生成fs2的Stream:

          Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
            Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
            pipe.unNoneTerminate(q.dequeue)
          }

    整个构建Stream的过程在FunDA的fdasources包是这样定义的:

    package com.bayakala.funda.fdasources
    import fs2._
    import play.api.libs.iteratee._
    import com.bayakala.funda.fdapipes._
    import slick.driver.JdbcProfile
    
    object FDADataStream {
    
      class FDAStreamLoader[SOURCE, TARGET](slickProfile: JdbcProfile, convert: SOURCE => TARGET) {
    
        import slickProfile.api._
    
        def fda_typedStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[TARGET] = {
          val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
          val action_ = action.withStatementParameters(fetchSize = fetchSize)
          val publisher = slickDB.stream(disableAutocommit andThen action)
          val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)
    
          Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
            Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
            pipe.unNoneTerminate(q.dequeue).map {row => convert(row)}
          }
    
        }
        def fda_plainStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[SOURCE] = {
          val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
          val action_ = action.withStatementParameters(fetchSize = fetchSize)
          val publisher = slickDB.stream(disableAutocommit andThen action)
          val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)
    
          Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
            Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
            pipe.unNoneTerminate(q.dequeue)
          }
    
        }
        private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont {
          case Input.EOF => {
            q.enqueue1(None).unsafeRun
            Done((), Input.Empty)
          }
          case Input.Empty => pushData(q)
          case Input.El(e) => {
            q.enqueue1(Some(e)).unsafeRun
            pushData(q)
          }
        }
    
      }
      object FDAStreamLoader {
        def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDAStreamLoader[SOURCE, TARGET] =
          new FDAStreamLoader[SOURCE, TARGET](slickProfile, converter)
      }
    }

    FDADataStream对象内主要实现了fda_typedStream和fda_plainStream。fda_typedStream提供了SOURCE=>TARGET的转换。从Enumerator转换到Stream整个过程和原理我们在FunDA(7)里已经详细介绍过了。下面我们看看FunDA-Example中fda_typedStream的具体应用例子:

    package com.bayakala.funda.fdasources.examples
    import slick.driver.H2Driver.api._
    import com.bayakala.funda.fdasources.FDADataStream._
    import com.bayakala.funda.samples._
    import com.bayakala.funda.fdarows._
    import com.bayakala.funda.fdapipes._
    import FDANodes._
    import FDAValves._
    object Example2 extends App {
       val albums = SlickModels.albums
       val companies = SlickModels.companies
    
    //数据源query
       val albumsInfo = for {
         (a,c) <- albums join companies on (_.company === _.id)
       } yield (a.title,a.artist,a.year,c.name)
    
    //query结果强类型(用户提供)
      case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
    //转换函数(用户提供)
      def toTypedRow(row: (String, String, Option[Int], String)): Album =
        Album(row._1, row._2, row._3.getOrElse(2000), row._4)
    
      val db = Database.forConfig("h2db")
    
      val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)
      val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(512,128)
    
    //定义一个用户作业函数:列印数据内容
      def printAlbums: FDATask[FDAROW] = row => {
        row match {
          case album: Album =>
            println("____________________")
            println(s"品名:${album.title}")
            println(s"演唱:${album.artist}")
            println(s"年份:${album.year}")
            println(s"发行:${album.publisher}")
            fda_next(album)
          case _ => fda_skip
        }
      }
    
      albumStream.through(fda_execUserTask(printAlbums)).run.unsafeRun
    
    }

    运算结果:

    品名:Keyboard Cat's Greatest Hits
    演唱:Keyboard Cat
    年份:1999
    发行:Sony Music Inc
    ____________________
    品名:Spice
    演唱:Spice Girls
    年份:1999
    发行:Columbia Records
    ____________________
    品名:Whenever You Need Somebody
    演唱:Rick Astley
    年份:1999
    发行:Sony Music Inc
    ____________________
    品名:The Triumph of Steel
    演唱:Manowar
    年份:1999
    发行:The K-Pops Singers
    ____________________
    品名:Believe
    演唱:Justin Bieber
    年份:1999
    发行:Columbia Records
    
    Process finished with exit code 0

     

     

     

  • 相关阅读:
    sql2slack alash3al 开源的又个轻量级工具
    pgspider fetchq 扩展docker镜像
    godns 集成coredns 的demo
    godns 简单dnsmasq 的dns 替换方案
    aviary.sh 一个基于bash的分布式配置管理工具
    使用coredns 的template plugin实现一个xip 服务
    nginx 代理 coredns dns 服务
    基于nginx proxy dns server
    几个不错的geodns server
    spring boot rest api 最好添加servlet.context-path
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/6396905.html
Copyright © 2011-2022 走看看