zoukankan      html  css  js  c++  java
  • SDP(12): MongoDB-Engine

       在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个MongoDB-connector里包含了MongoSource,MongoFlow,MongoSink。我们只使用MongoSource,其它两个我们直接用mapAsyc来创造。下面是MongoSource的定义:

    object MongoSource {
    
      def apply(query: Observable[Document]): Source[Document, NotUsed] =
        Source.fromPublisher(ObservableToPublisher(query))
    
    }

    实际上就是把Mongo-scala的Observable[Document]转成Source[Document, NotUsed]。我们还是通过传入context来构建这个Source:

      case class MGOContext(
                             dbName: String,
                             collName: String,
                             action: MGOCommands = null
                           ) {...}
       case class DocumentStream(filter: Option[Bson] = None,
                                  andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                                 ) extends MGOCommands

    Source的具体实现:

        def mongoStream(ctx: MGOContext)(
          implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
          val db = client.getDatabase(ctx.dbName)
          val coll = db.getCollection(ctx.collName)
          ctx.action match {
            case DocumentStream(None, None) =>
              MongoSource(coll.find())
            case DocumentStream(Some(filter), None) =>
              MongoSource(coll.find(filter))
            case DocumentStream(None, Some(next)) =>
              MongoSource(next(coll.find()))
            case DocumentStream(Some(filter), Some(next)) =>
              MongoSource(next(coll.find(filter)))
          }
        }

    下面是mongoStream的使用示范:

      val clusterSettings = ClusterSettings.builder()
        .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
      val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
      implicit val client = MongoClient(clientSettings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
      implicit val ec = system.dispatcher
    
      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl")
        )
      }
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
              .map { dc => toPODTL(dc)}
              .foreach { doc: PODTL =>
                print(s"==>Item: ${doc.item} ")
                print(s"price: ${doc.price} ")
                print(s"qty: ${doc.qty} ")
                doc.packing.foreach(pk => print(s"packing: ${pk} "))
                doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                println("")
              }
          case _ =>
        }
      }
      import org.mongodb.scala.model.Projections._
      import MongoActionStream._
      import MGOEngine._
      import akka.stream.scaladsl.{Sink, Source}
    
      val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))
      val ctx = MGOContext("testdb","po").setCommand(
        DocumentStream(filter = None, andThen = Some(proj)))
    
    
      val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))
    
      println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))))

    我们看到:使用了许多代码去进行类型转换。不过也没有什么太好的办法,已经是一次性的了。我们也可以通过akka的Flow[A,B]来以stream里的A元素为变量对MongoDB数据进行更新操作:

     object MongoActionStream {
    
        import MGOContext._
    
        case class StreamingInsert[A](dbName: String,
                                      collName: String,
                                      converter: A => Document,
                                      parallelism: Int = 1
                                     ) extends MGOCommands
    
        case class StreamingDelete[A](dbName: String,
                                      collName: String,
                                      toFilter: A => Bson,
                                      parallelism: Int = 1,
                                      justOne: Boolean = false
                                     ) extends MGOCommands
    
        case class StreamingUpdate[A](dbName: String,
                                      collName: String,
                                      toFilter: A => Bson,
                                      toUpdate: A => Bson,
                                      parallelism: Int = 1,
                                      justOne: Boolean = false
                                     ) extends MGOCommands
    
    
        case class InsertAction[A](ctx: StreamingInsert[A])(
          implicit mongoClient: MongoClient) {
    
          val database = mongoClient.getDatabase(ctx.dbName)
          val collection = database.getCollection(ctx.collName)
    
          def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
            Flow[A].map(ctx.converter)
              .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
        }
    
        case class UpdateAction[A](ctx: StreamingUpdate[A])(
          implicit mongoClient: MongoClient) {
    
          val database = mongoClient.getDatabase(ctx.dbName)
          val collection = database.getCollection(ctx.collName)
    
          def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
            if (ctx.justOne) {
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
            } else
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
        }
    
    
        case class DeleteAction[A](ctx: StreamingDelete[A])(
          implicit mongoClient: MongoClient) {
    
          val database = mongoClient.getDatabase(ctx.dbName)
          val collection = database.getCollection(ctx.collName)
    
          def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
            if (ctx.justOne) {
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
            } else
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
        }
    
      }

    下面是insert, update及delete操作的示范。在这个示范里我们同时调用了JDBCEngine,CassandraEngine和MongoDBEngine:

      import jdbcengine._
      import JDBCEngine._
      import scalikejdbc._
    
      case class DataRow (
                           rowid: Long,
                           measureid: Long,
                           state: String,
                           county: String,
                           year: Int,
                           value: Int
                         )
      val toRow: WrappedResultSet => DataRow = rs => DataRow(
        rowid = rs.long("ROWID"),
        measureid = rs.long("MEASUREID"),
        state = rs.string("STATENAME"),
        county = rs.string("COUNTYNAME"),
        year = rs.int("REPORTYEAR"),
        value = rs.int("VALUE")
      )
    
      //construct the context
      val h2ctx = JDBCQueryContext[DataRow](
        dbName = 'h2,
        statement = "select * from AQMRPT",
        extractor = toRow
      )
    
      //source from h2 database
      val jdbcSource = jdbcAkkaStream(h2ctx)
    
      //document converter
      def rowToDoc: DataRow => Document = row => Document (
        "rowid" -> row.rowid,
        "measureid" ->  row.measureid,
        "state" ->  row.state,
        "county" ->  row.county,
        "year" ->  row.year,
        "value" ->  row.value
      )
      def docToRow: Document => DataRow = doc => DataRow (
        rowid = doc.getLong("rowid"),
        measureid = doc.getLong("measureid"),
        state = doc.getString("state"),
        county = doc.getString("county"),
        year = doc.getInteger("year"),
        value = doc.getInteger("value")
      )
      //setup context
      val mgoctx = StreamingInsert("testdb","members",rowToDoc)
      val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)
      val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)
      val sink = Sink.foreach[DataRow]{ r =>
        println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
      }
    
      //config jdbc drivers
      ConfigDBsWithEnv("dev").setup('h2)
      ConfigDBsWithEnv("dev").loadGlobalSettings()
    
      val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run()
    
      val mgoCtxShow = MGOContext("testdb","members").setCommand(
        DocumentStream(filter = None))
    
      mongoStream(mgoCtxShow).map(docToRow).to(sink).run()
    
    
    
      import com.datastax.driver.core._
      import cassandraengine._
      import CQLEngine._
      import org.mongodb.scala.model.Filters._
    
      //data row converter
      val cqlToDataRow = (rs: Row) => DataRow(
        rowid = rs.getLong("ROWID"),
        measureid = rs.getLong("MEASUREID"),
        state = rs.getString("STATENAME"),
        county = rs.getString("COUNTYNAME"),
        year = rs.getInt("REPORTYEAR"),
        value = rs.getInt("VALUE")
      )
    
     import org.bson.conversions._
      import org.mongodb.scala.model.Updates._
    
      //#init-session
      implicit val session = Cluster.builder
        .addContactPoint("127.0.0.1")
        .withPort(9042)
        .build
        .connect()
    
    
      //setup context
      val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)
      //construct source
      val cqlSource = cassandraStream(cqlCtx)
    
      def toFilter: DataRow => Bson = row => {
        and(equal("rowid",row.rowid), lt("value",10))
      }
      def toUpdate: DataRow => Bson = row => {
        set("value" , row.value * 10)
      }
      val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)
      val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)
      val sts = cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run()
    
    
      import org.bson.conversions._
      import org.mongodb.scala.model.Filters._
      def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10))
    
      val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)
      val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)
      val mgoCtxSrc = MGOContext("testdb","members").setCommand(
        DocumentStream(filter = None))
      mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run()
    
      import org.mongodb.scala.model.Sorts._
      val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))
      val mgoCtxShow = MGOContext("testdb","members").setCommand(
        DocumentStream(filter = None, andThen = Some(sortDsc)))
    
      mongoStream(mgoCtxShow).map(docToRow).to(sink).run()

    下面是本次示范的全部源代码:

    build.sbt

    name := "learn-mongo"
    
    version := "0.1"
    
    scalaVersion := "2.12.4"
    
    libraryDependencies := Seq(
      "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
      "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",
      "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17",
      "com.typesafe.akka" %% "akka-actor" % "2.5.4",
      "com.typesafe.akka" %% "akka-stream" % "2.5.4",
      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
      "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
      "com.h2database"  %  "h2"                % "1.4.196",
      "mysql" % "mysql-connector-java" % "6.0.6",
      "org.postgresql" % "postgresql" % "42.2.0",
      "commons-dbcp" % "commons-dbcp" % "1.4",
      "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.2.1",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
    )

    resources/application.conf

    # JDBC settings
    test {
      db {
        h2 {
          driver = "org.h2.Driver"
          url = "jdbc:h2:tcp://localhost/~/slickdemo"
          user = ""
          password = ""
          poolInitialSize = 5
          poolMaxSize = 7
          poolConnectionTimeoutMillis = 1000
          poolValidationQuery = "select 1 as one"
          poolFactoryName = "commons-dbcp2"
        }
      }
    
      db.mysql.driver = "com.mysql.cj.jdbc.Driver"
      db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
      db.mysql.user = "root"
      db.mysql.password = "123"
      db.mysql.poolInitialSize = 5
      db.mysql.poolMaxSize = 7
      db.mysql.poolConnectionTimeoutMillis = 1000
      db.mysql.poolValidationQuery = "select 1 as one"
      db.mysql.poolFactoryName = "bonecp"
    
      # scallikejdbc Global settings
      scalikejdbc.global.loggingSQLAndTime.enabled = true
      scalikejdbc.global.loggingSQLAndTime.logLevel = info
      scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
      scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
      scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
      scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
      scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
      scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
    }
    dev {
      db {
        h2 {
          driver = "org.h2.Driver"
          url = "jdbc:h2:tcp://localhost/~/slickdemo"
          user = ""
          password = ""
          poolFactoryName = "hikaricp"
          numThreads = 10
          maxConnections = 12
          minConnections = 4
          keepAliveConnection = true
        }
        mysql {
          driver = "com.mysql.cj.jdbc.Driver"
          url = "jdbc:mysql://localhost:3306/testdb"
          user = "root"
          password = "123"
          poolInitialSize = 5
          poolMaxSize = 7
          poolConnectionTimeoutMillis = 1000
          poolValidationQuery = "select 1 as one"
          poolFactoryName = "bonecp"
    
        }
        postgres {
          driver = "org.postgresql.Driver"
          url = "jdbc:postgresql://localhost:5432/testdb"
          user = "root"
          password = "123"
          poolFactoryName = "hikaricp"
          numThreads = 10
          maxConnections = 12
          minConnections = 4
          keepAliveConnection = true
        }
      }
      # scallikejdbc Global settings
      scalikejdbc.global.loggingSQLAndTime.enabled = true
      scalikejdbc.global.loggingSQLAndTime.logLevel = info
      scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
      scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
      scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
      scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
      scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
      scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
    }

    JDBCEngine.scala

    package jdbcengine
    import java.sql.PreparedStatement
    
    import scala.collection.generic.CanBuildFrom
    import akka.stream.scaladsl._
    import scalikejdbc._
    import scalikejdbc.streams._
    import akka.NotUsed
    import akka.stream._
    import scala.util._
    import java.time._
    import scala.concurrent.duration._
    import filestreaming.FileStreaming._
    
    import scalikejdbc.TxBoundary.Try._
    
    import scala.concurrent.ExecutionContextExecutor
    import java.io.InputStream
    
    object JDBCContext {
      type SQLTYPE = Int
      val SQL_EXEDDL= 1
      val SQL_UPDATE = 2
      val RETURN_GENERATED_KEYVALUE = true
      val RETURN_UPDATED_COUNT = false
    
    }
    
    case class JDBCQueryContext[M](
                                    dbName: Symbol,
                                    statement: String,
                                    parameters: Seq[Any] = Nil,
                                    fetchSize: Int = 100,
                                    autoCommit: Boolean = false,
                                    queryTimeout: Option[Int] = None,
                                    extractor: WrappedResultSet => M)
    
    
    case class JDBCContext(
                            dbName: Symbol,
                            statements: Seq[String] = Nil,
                            parameters: Seq[Seq[Any]] = Nil,
                            fetchSize: Int = 100,
                            queryTimeout: Option[Int] = None,
                            queryTags: Seq[String] = Nil,
                            sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
                            batch: Boolean = false,
                            returnGeneratedKey: Seq[Option[Any]] = Nil,
                            // no return: None, return by index: Some(1), by name: Some("id")
                            preAction: Option[PreparedStatement => Unit] = None,
                            postAction: Option[PreparedStatement => Unit] = None) {
    
      ctx =>
    
      //helper functions
    
      def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
    
      def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
    
      def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
    
      def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
    
      def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
          !ctx.batch && ctx.statements.size == 1)
          ctx.copy(preAction = action)
        else
          throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
      }
    
      def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
          !ctx.batch && ctx.statements.size == 1)
          ctx.copy(postAction = action)
        else
          throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
      }
    
      def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
          ctx.copy(
            statements = ctx.statements ++ Seq(_statement),
            parameters = ctx.parameters ++ Seq(Seq(_parameters))
          )
        } else
          throw new IllegalStateException("JDBCContex setting error: option not supported!")
      }
    
      def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
          ctx.copy(
            statements = ctx.statements ++ Seq(_statement),
            parameters = ctx.parameters ++ Seq(_parameters),
            returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
          )
        } else
          throw new IllegalStateException("JDBCContex setting error: option not supported!")
      }
    
      def appendBatchParameters(_parameters: Any*): JDBCContext = {
        if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
          throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
    
        var matchParams = true
        if (ctx.parameters != Nil)
          if (ctx.parameters.head.size != _parameters.size)
            matchParams = false
        if (matchParams) {
          ctx.copy(
            parameters = ctx.parameters ++ Seq(_parameters)
          )
        } else
          throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
      }
    
      def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
        if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
          throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
        ctx.copy(
          returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
        )
      }
    
      def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          sqlType = JDBCContext.SQL_EXEDDL,
          batch = false
        )
      }
    
      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = false
        )
      }
      def setBatchCommand(_statement: String): JDBCContext = {
        ctx.copy (
          statements = Seq(_statement),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = true
        )
      }
    
      type JDBCDate = LocalDate
      type JDBCDateTime = LocalDateTime
    
      def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
      def jdbcSetNow = LocalDateTime.now()
    
      type JDBCBlob = InputStream
    
      def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToInputStream(fileName,timeOut)
    
      def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
        implicit mat: Materializer) =  InputStreamToFile(blob,fileName)
    
    
    }
    
    object JDBCEngine {
    
      import JDBCContext._
    
      private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
        throw new IllegalStateException(message)
      }
    
      def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
                           (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
    
        val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
    
          sql.iterator
            .withDBSessionForceAdjuster(session => {
              session.connection.setAutoCommit(ctx.autoCommit)
              session.fetchSize(ctx.fetchSize)
            })
        }
        Source.fromPublisher[A](publisher)
      }
    
    
      def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
                                                          ctx: JDBCQueryContext[A])(
                                                          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
    
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
        ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
        rawSql.fetchSize(ctx.fetchSize)
        implicit val session = NamedAutoSession(ctx.dbName)
        val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
        sql.collection.apply[C]()
    
      }
    
      def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
        if (ctx.sqlType != SQL_EXEDDL) {
          Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
        }
        else {
          NamedDB(ctx.dbName) localTx { implicit session =>
            Try {
              ctx.statements.foreach { stm =>
                val ddl = new SQLExecution(statement = stm, parameters = Nil)(
                  before = WrappedResultSet => {})(
                  after = WrappedResultSet => {})
    
                ddl.apply()
              }
              "SQL_EXEDDL executed succesfully."
            }
          }
        }
      }
    
      def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        if (ctx.statements == Nil)
          throw new IllegalStateException("JDBCContex setting error: statements empty!")
        if (ctx.sqlType != SQL_UPDATE) {
          Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
        }
        else {
          if (ctx.batch) {
            if (noReturnKey(ctx)) {
              val usql = SQL(ctx.statements.head)
                .tags(ctx.queryTags: _*)
                .batch(ctx.parameters: _*)
              Try {
                NamedDB(ctx.dbName) localTx { implicit session =>
                  ctx.queryTimeout.foreach(session.queryTimeout(_))
                  usql.apply[Seq]()
                  Seq.empty[Long].to[C]
                }
              }
            } else {
              val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
              Try {
                NamedDB(ctx.dbName) localTx { implicit session =>
                  ctx.queryTimeout.foreach(session.queryTimeout(_))
                  usql.apply[C]()
                }
              }
            }
    
          } else {
            Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
          }
        }
      }
      private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        val Some(key) :: xs = ctx.returnGeneratedKey
        val params: Seq[Any] = ctx.parameters match {
          case Nil => Nil
          case p@_ => p.head
        }
        val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
        Try {
          NamedDB(ctx.dbName) localTx { implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val result = usql.apply()
            Seq(result).to[C]
          }
        }
      }
    
      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        val params: Seq[Any] = ctx.parameters match {
          case Nil => Nil
          case p@_ => p.head
        }
        val before = ctx.preAction match {
          case None => pstm: PreparedStatement => {}
          case Some(f) => f
        }
        val after = ctx.postAction match {
          case None => pstm: PreparedStatement => {}
          case Some(f) => f
        }
        val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
        Try {
          NamedDB(ctx.dbName) localTx {implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val result = usql.apply()
            Seq(result.toLong).to[C]
          }
        }
    
      }
    
      private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        if (noReturnKey(ctx))
          singleTxUpdateNoReturnKey(ctx)
        else
          singleTxUpdateWithReturnKey(ctx)
      }
    
      private def noReturnKey(ctx: JDBCContext): Boolean = {
        if (ctx.returnGeneratedKey != Nil) {
          val k :: xs = ctx.returnGeneratedKey
          k match {
            case None => true
            case Some(k) => false
          }
        } else true
      }
    
      def noActon: PreparedStatement=>Unit = pstm => {}
    
      def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        Try {
          NamedDB(ctx.dbName) localTx { implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
              case Nil => Seq.fill(ctx.statements.size)(None)
              case k@_ => k
            }
            val sqlcmd = ctx.statements zip ctx.parameters zip keys
            val results = sqlcmd.map { case ((stm, param), key) =>
              key match {
                case None =>
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
                case Some(k) =>
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
              }
            }
            results.to[C]
          }
        }
      }
    
    
      def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        if (ctx.statements == Nil)
          throw new IllegalStateException("JDBCContex setting error: statements empty!")
        if (ctx.sqlType != SQL_UPDATE) {
          Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
        }
        else {
          if (!ctx.batch) {
            if (ctx.statements.size == 1)
              singleTxUpdate(ctx)
            else
              multiTxUpdates(ctx)
          } else
            Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
    
        }
      }
    
      case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
                                     statement: String, prepareParams: R => Seq[Any]) {
        jas =>
        def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
        def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
        def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
    
        private def perform(r: R) = {
          import scala.concurrent._
          val params = prepareParams(r)
          NamedDB(dbName) autoCommit { session =>
            session.execute(statement,params: _*)
          }
          Future.successful(r)
        }
        def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] =
          if (processInOrder)
            Flow[R].mapAsync(parallelism)(perform)
          else
            Flow[R].mapAsyncUnordered(parallelism)(perform)
    
      }
      object JDBCActionStream {
        def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
          new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
      }
    
    
    }

    CassandraEngine.scala

    package cassandraengine
    import com.datastax.driver.core._
    
    import scala.concurrent._
    import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
    
    import scala.collection.JavaConverters._
    import scala.collection.generic.CanBuildFrom
    import scala.concurrent.duration.Duration
    import akka.NotUsed
    import akka.stream.alpakka.cassandra.scaladsl._
    import akka.stream.scaladsl._
    import filestreaming.FileStreaming._
    
    object CQLContext {
      // Consistency Levels
      type CONSISTENCY_LEVEL = Int
      val ANY: CONSISTENCY_LEVEL          =                                        0x0000
      val ONE: CONSISTENCY_LEVEL          =                                        0x0001
      val TWO: CONSISTENCY_LEVEL          =                                        0x0002
      val THREE: CONSISTENCY_LEVEL        =                                        0x0003
      val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004
      val ALL: CONSISTENCY_LEVEL          =                                        0x0005
      val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006
      val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007
      val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A
      val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B
      val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C
    
      def apply(): CQLContext = CQLContext(statements = Nil)
    
      def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
        consistency match {
          case ALL => ConsistencyLevel.ALL
          case ONE => ConsistencyLevel.ONE
          case TWO => ConsistencyLevel.TWO
          case THREE => ConsistencyLevel.THREE
          case ANY => ConsistencyLevel.ANY
          case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
          case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
          case QUORUM => ConsistencyLevel.QUORUM
          case SERIAL => ConsistencyLevel.SERIAL
          case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
    
        }
      }
    
    }
    case class CQLQueryContext[M](
                                   statement: String,
                                   extractor: Row => M,
                                   parameter: Seq[Object] = Nil,
                                   consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                                   fetchSize: Int = 100
                                 ) { ctx =>
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] =
        ctx.copy(consistency = Some(_consistency))
      def setFetchSize(pageSize: Int): CQLQueryContext[M] =
        ctx.copy(fetchSize = pageSize)
    }
    object CQLQueryContext {
      def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =
        new CQLQueryContext[M](statement = stmt, extractor = converter)
    }
    
    case class CQLContext(
                           statements: Seq[String],
                           parameters: Seq[Seq[Object]] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                         ) { ctx =>
    
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
        ctx.copy(consistency = Some(_consistency))
      def setCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
      def appendCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = ctx.statements :+ _statement,
          parameters = ctx.parameters ++ Seq(_parameters))
    }
    
    object CQLEngine {
      import CQLContext._
      import CQLHelpers._
    
      def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
        implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
    
        val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameter != Nil) {
          val params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
        (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
      }
      def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
        extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
        if (resultSet.isFullyFetched) {
          (resultSet, None)
        } else {
          try {
            val result = Await.result(resultSet.fetchMoreResults(), timeOut)
            (result, Some((result.asScala.view.map(extractor)).to[C]))
          } catch { case e: Throwable => (resultSet, None) }
        }
      def cqlExecute(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        if (ctx.statements.size == 1)
          cqlSingleUpdate(ctx)
        else
          cqlMultiUpdate(ctx)
      }
      def cqlSingleUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
        val prepStmt = session.prepare(ctx.statements.head)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameters != Nil) {
          val params = processParameters(ctx.parameters.head)
          boundStmt = prepStmt.bind(params:_*)
        }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
        session.executeAsync(boundStmt).map(_.wasApplied())
      }
      def cqlMultiUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
        var batch = new BatchStatement()
        commands.foreach { case (stm, params) =>
          val prepStmt = session.prepare(stm)
          if (params == Nil)
            batch.add(prepStmt.bind())
          else {
            val p = processParameters(params)
            batch.add(prepStmt.bind(p: _*))
          }
        }
        ctx.consistency.foreach {consistency =>
          batch.setConsistencyLevel(consistencyLevel(consistency))}
        session.executeAsync(batch).map(_.wasApplied())
      }
    
      def cassandraStream[A](ctx: CQLQueryContext[A])
                            (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
    
        val prepStmt = session.prepare(ctx.statement)
        var boundStmt =  prepStmt.bind()
        val params = processParameters(ctx.parameter)
        boundStmt = prepStmt.bind(params:_*)
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor)
      }
    
      case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
                                          statement: String, prepareParams: R => Seq[Object],
                                          consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
        def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
        def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
        def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
          cas.copy(consistency = Some(_consistency))
    
        private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
          val prepStmt = session.prepare(statement)
          var boundStmt =  prepStmt.bind()
          val params = processParameters(prepareParams(r))
          boundStmt = prepStmt.bind(params:_*)
          consistency.foreach { cons =>
            boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
          }
          session.executeAsync(boundStmt).map(_ => r)
        }
        def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
          if (processInOrder)
            Flow[R].mapAsync(parallelism)(perform)
          else
            Flow[R].mapAsyncUnordered(parallelism)(perform)
    
      }
      object CassandraActionStream {
        def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
          new CassandraActionStream[R]( statement=_statement, prepareParams = params)
      }
    
    }
    object CQLHelpers {
      import java.nio.ByteBuffer
      import com.datastax.driver.core.LocalDate
      import java.time.Instant
      import akka.stream._
      import scala.concurrent.duration._
    
    
      implicit def listenableFutureToFuture[T](
                                                listenableFuture: ListenableFuture[T]): Future[T] = {
        val promise = Promise[T]()
        Futures.addCallback(listenableFuture, new FutureCallback[T] {
          def onFailure(error: Throwable): Unit = {
            promise.failure(error)
            ()
          }
          def onSuccess(result: T): Unit = {
            promise.success(result)
            ()
          }
        })
        promise.future
      }
    
      type CQLBlob = ByteBuffer
      case class CQLDate(year: Int, month: Int, day: Int)
      case object CQLTodayDate
      case class CQLDateTime(year: Int, Month: Int,
                             day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
      case object CQLDateTimeNow
    
      def processParameters(params: Seq[Object]): Seq[Object] = {
        import java.time.{Clock,ZoneId}
        params.map { obj =>
          obj match {
            case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
            case CQLTodayDate =>
              val today = java.time.LocalDate.now()
              LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
            case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
            case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
              Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
            case p@_ => p
          }
        }
      }
    
      def fileToCQLBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteBuffer(fileName,timeOut)
    
      def cqlBlobToFile(blob: CQLBlob, fileName: String)(
        implicit mat: Materializer) =  ByteBufferToFile(blob,fileName)
     }

    MongoEngine.scala

    import java.text.SimpleDateFormat
    
    import akka.NotUsed
    import akka.stream.alpakka.mongodb.scaladsl._
    import akka.stream.scaladsl.{Flow, Sink, Source}
    import org.mongodb.scala.MongoClient
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.bson.conversions.Bson
    import org.mongodb.scala._
    import org.mongodb.scala.model._
    import java.util.Calendar
    
    import scala.collection.JavaConverters._
    import filestreaming.FileStreaming._
    import akka.stream.Materializer
    import org.mongodb.scala.bson.{BsonArray, BsonBinary}
    
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object MGOContext {
    
      trait MGOCommands
    
      object MGOCommands {
    
        case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
    
      /*  org.mongodb.scala.FindObservable
      import com.mongodb.async.client.FindIterable
      val resultDocType = FindIterable[Document]
      val resultOption = FindObservable(resultDocType)
        .maxScan(...)
      .limit(...)
      .sort(...)
      .project(...) */
        case class Find[M](filter: Option[Bson] = None,
                        andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                        converter: Option[Document => M] = None,
                        firstOnly: Boolean = false) extends MGOCommands
    
        case class DocumentStream(filter: Option[Bson] = None,
                                  andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                                 ) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }
    
      case class MGOContext(
                             dbName: String,
                             collName: String,
                             action: MGOCommands = null
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd)
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
    
        def apply(db: String, coll: String, command: MGOCommands) =
          new MGOContext(db, coll, command)
    
      }
    
      type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)
    
      def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
       (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
    }
    object MGOEngine {
    
      import MGOContext._
      import MGOCommands._
      import MGOAdmins._
    
      def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        ctx.action match {
          /* count */
          case Count(Some(filter), Some(opt)) =>
            coll.count(filter, opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter), None) =>
            coll.count(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None, None) =>
            coll.count().toFuture()
              .asInstanceOf[Future[T]]
          /* distinct */
          case Distict(field, Some(filter)) =>
            coll.distinct(field, filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field, None) =>
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None, None, optConv, false) =>
            if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(None, None, optConv, true) =>
            if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, false) =>
            if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, true) =>
            if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(None, Some(next), optConv, _) =>
            if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), Some(next), optConv, _) =>
            if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          /* aggregate */
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
          /* mapReduce */
          case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
          /* insert */
          case Insert(docs, Some(opt)) =>
            if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
              .asInstanceOf[Future[T]]
            else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
              .asInstanceOf[Future[T]]
          case Insert(docs, None) =>
            if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
            else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
          /* delete */
          case Delete(filter, None, onlyOne) =>
            if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
          case Delete(filter, Some(opt), onlyOne) =>
            if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
          /* replace */
          case Replace(filter, replacement, None) =>
            coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
          case Replace(filter, replacement, Some(opt)) =>
            coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* update */
          case Update(filter, update, None, onlyOne) =>
            if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
          case Update(filter, update, Some(opt), onlyOne) =>
            if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* bulkWrite */
          case BulkWrite(commands, None) =>
            coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
          case BulkWrite(commands, Some(opt)) =>
            coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
    
          /* drop collection */
          case DropCollection(collName) =>
            val coll = db.getCollection(collName)
            coll.drop().toFuture().asInstanceOf[Future[T]]
          /* create collection */
          case CreateCollection(collName, None) =>
            db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
          case CreateCollection(collName, Some(opt)) =>
            db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
          /* list collection */
          case ListCollection(dbName) =>
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
          /* create view */
          case CreateView(viewName, viewOn, pline, None) =>
            db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
          case CreateView(viewName, viewOn, pline, Some(opt)) =>
            db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
          /* create index */
          case CreateIndex(key, None) =>
            coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
          case CreateIndex(key, Some(opt)) =>
            coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
          /* drop index */
          case DropIndexByName(indexName, None) =>
            coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
          case DropIndexByName(indexName, Some(opt)) =>
            coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, None) =>
            coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, Some(opt)) =>
            coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(None) =>
            coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(Some(opt)) =>
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
        }
      }
    
      def mongoStream(ctx: MGOContext)(
        implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        ctx.action match {
          case DocumentStream(None, None) =>
            MongoSource(coll.find())
          case DocumentStream(Some(filter), None) =>
            MongoSource(coll.find(filter))
          case DocumentStream(None, Some(next)) =>
            MongoSource(next(coll.find()))
          case DocumentStream(Some(filter), Some(next)) =>
            MongoSource(next(coll.find(filter)))
        }
      }
    
    }
    
      object MongoActionStream {
        import MGOContext._
    
        case class StreamingInsert[A](dbName: String,
                                      collName: String,
                                      converter: A => Document,
                                      parallelism: Int = 1
                                     ) extends MGOCommands
    
        case class StreamingDelete[A](dbName: String,
                                      collName: String,
                                      toFilter: A => Bson,
                                      parallelism: Int = 1,
                                      justOne: Boolean = false
                                     ) extends MGOCommands
    
        case class StreamingUpdate[A](dbName: String,
                                      collName: String,
                                      toFilter: A => Bson,
                                      toUpdate: A => Bson,
                                      parallelism: Int = 1,
                                      justOne: Boolean = false
                                     ) extends MGOCommands
    
    
        case class InsertAction[A](ctx: StreamingInsert[A])(
          implicit mongoClient: MongoClient) {
    
          val database = mongoClient.getDatabase(ctx.dbName)
          val collection = database.getCollection(ctx.collName)
    
          def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
            Flow[A].map(ctx.converter)
              .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
        }
    
        case class UpdateAction[A](ctx: StreamingUpdate[A])(
          implicit mongoClient: MongoClient) {
    
          val database = mongoClient.getDatabase(ctx.dbName)
          val collection = database.getCollection(ctx.collName)
    
          def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
            if (ctx.justOne) {
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
            } else
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
        }
    
    
        case class DeleteAction[A](ctx: StreamingDelete[A])(
          implicit mongoClient: MongoClient) {
    
          val database = mongoClient.getDatabase(ctx.dbName)
          val collection = database.getCollection(ctx.collName)
    
          def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
            if (ctx.justOne) {
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
            } else
              Flow[A]
                .mapAsync(ctx.parallelism)(a =>
                  collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
        }
    
      }
    import org.mongodb.scala._
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object MGOHelpers {
    
      implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
        override val converter: (Document) => String = (doc) => doc.toJson
      }
    
      implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
        override val converter: (C) => String = (doc) => doc.toString
      }
    
      trait ImplicitObservable[C] {
        val observable: Observable[C]
        val converter: (C) => String
    
        def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
        def headResult() = Await.result(observable.head(), 10 seconds)
        def printResults(initial: String = ""): Unit = {
          if (initial.length > 0) print(initial)
          results().foreach(res => println(converter(res)))
        }
        def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
      }
    
      def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
          Await.result(fut,timeOut)
      }
      def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
        Await.result(fut,timeOut)
      }
    
    }

    FileStreaming.scala

    package filestreaming
    import java.io.{InputStream, ByteArrayInputStream}
    import java.nio.ByteBuffer
    import java.nio.file.Paths
    
    import akka.stream.{Materializer}
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    
    import scala.concurrent.{Await}
    import akka.util._
    import scala.concurrent.duration._
    
    object FileStreaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
    }

    FileStreaming.scala

    package filestreaming
    import java.io.{InputStream, ByteArrayInputStream}
    import java.nio.ByteBuffer
    import java.nio.file.Paths
    
    import akka.stream.{Materializer}
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    
    import scala.concurrent.{Await}
    import akka.util._
    import scala.concurrent.duration._
    
    object FileStreaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
    }

    HikariCPool.scala

    package jdbcengine
    import scala.collection.mutable
    import scala.concurrent.duration.Duration
    import scala.language.implicitConversions
    import com.typesafe.config._
    import java.util.concurrent.TimeUnit
    import java.util.Properties
    import scalikejdbc.config._
    import com.typesafe.config.Config
    import com.zaxxer.hikari._
    import scalikejdbc.ConnectionPoolFactoryRepository
    
    /** Extension methods to make Typesafe Config easier to use */
    class ConfigExtensionMethods(val c: Config) extends AnyVal {
      import scala.collection.JavaConverters._
    
      def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
      def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
      def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
      def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default
    
      def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
      def getDurationOr(path: String, default: => Duration = Duration.Zero) =
        if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default
    
      def getPropertiesOr(path: String, default: => Properties = null): Properties =
        if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default
    
      def toProperties: Properties = {
        def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
          val props = new Properties(null)
          m.foreach { case (k, cv) =>
            val v =
              if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
              else if(cv.unwrapped eq null) null
              else cv.unwrapped.toString
            if(v ne null) props.put(k, v)
          }
          props
        }
        toProps(c.root.asScala)
      }
    
      def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
      def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
      def getStringOpt(path: String) = Option(getStringOr(path))
      def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
    }
    
    object ConfigExtensionMethods {
      @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
    }
    
    trait HikariConfigReader extends TypesafeConfigReader {
      self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>
    
      import ConfigExtensionMethods.configExtensionMethods
    
      def getFactoryName(dbName: Symbol): String = {
        val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
        c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
      }
    
      def hikariCPConfig(dbName: Symbol): HikariConfig = {
    
        val hconf = new HikariConfig()
        val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    
        // Connection settings
        if (c.hasPath("dataSourceClass")) {
          hconf.setDataSourceClassName(c.getString("dataSourceClass"))
        } else {
          Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
        }
        hconf.setJdbcUrl(c.getStringOr("url", null))
        c.getStringOpt("user").foreach(hconf.setUsername)
        c.getStringOpt("password").foreach(hconf.setPassword)
        c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)
    
        // Pool configuration
        hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
        hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
        hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
        hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
        hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
        hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
        c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
        c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
        val numThreads = c.getIntOr("numThreads", 20)
        hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
        hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
        hconf.setPoolName(c.getStringOr("poolName", dbName.name))
        hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))
    
        // Equivalent of ConnectionPreparer
        hconf.setReadOnly(c.getBooleanOr("readOnly", false))
        c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
        hconf.setCatalog(c.getStringOr("catalog", null))
    
        hconf
    
      }
    }
    
    import scalikejdbc._
    trait ConfigDBs {
      self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>
    
      def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
        getFactoryName(dbName) match {
          case "hikaricp" => {
            val hconf = hikariCPConfig(dbName)
            val hikariCPSource = new HikariDataSource(hconf)
            if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
              Class.forName(hconf.getDriverClassName)
            }
            ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
          }
          case _ => {
            val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
            val cpSettings = readConnectionPoolSettings(dbName)
            if (driver != null && driver.trim.nonEmpty) {
              Class.forName(driver)
            }
            ConnectionPool.add(dbName, url, user, password, cpSettings)
          }
        }
      }
    
      def setupAll(): Unit = {
        loadGlobalSettings()
        dbNames.foreach { dbName => setup(Symbol(dbName)) }
      }
    
      def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
        ConnectionPool.close(dbName)
      }
    
      def closeAll(): Unit = {
        ConnectionPool.closeAll
      }
    
    }
    
    
    object ConfigDBs extends ConfigDBs
      with TypesafeConfigReader
      with StandardTypesafeConfig
      with HikariConfigReader
    
    case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
      with TypesafeConfigReader
      with StandardTypesafeConfig
      with HikariConfigReader
      with EnvPrefix {
    
      override val env = Option(envValue)

    MongoStreamDemo.scala

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    import org.mongodb.scala.connection._
    import scala.collection.JavaConverters._
    
    object MongoStream extends App {
      import MGOContext._
      import MGOEngine._
      import MGOCommands._
      import MGOHelpers._
    
    
      val clusterSettings = ClusterSettings.builder()
        .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
      val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
      implicit val client = MongoClient(clientSettings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
      implicit val ec = system.dispatcher
    
      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl")
        )
      }
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
              .map { dc => toPODTL(dc)}
              .foreach { doc: PODTL =>
                print(s"==>Item: ${doc.item} ")
                print(s"price: ${doc.price} ")
                print(s"qty: ${doc.qty} ")
                doc.packing.foreach(pk => print(s"packing: ${pk} "))
                doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                println("")
              }
          case _ =>
        }
      }
      import org.mongodb.scala.model.Projections._
      import MongoActionStream._
      import MGOEngine._
      import akka.stream.scaladsl.{Sink, Source}
    
      val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))
      val ctx = MGOContext("testdb","po").setCommand(
        DocumentStream(filter = None, andThen = Some(proj)))
    
    
      val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))
    
      println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))))
    
    
      import jdbcengine._
      import JDBCEngine._
      import scalikejdbc._
    
      case class DataRow (
                           rowid: Long,
                           measureid: Long,
                           state: String,
                           county: String,
                           year: Int,
                           value: Int
                         )
      val toRow: WrappedResultSet => DataRow = rs => DataRow(
        rowid = rs.long("ROWID"),
        measureid = rs.long("MEASUREID"),
        state = rs.string("STATENAME"),
        county = rs.string("COUNTYNAME"),
        year = rs.int("REPORTYEAR"),
        value = rs.int("VALUE")
      )
    
      //construct the context
      val h2ctx = JDBCQueryContext[DataRow](
        dbName = 'h2,
        statement = "select * from AQMRPT",
        extractor = toRow
      )
    
      //source from h2 database
      val jdbcSource = jdbcAkkaStream(h2ctx)
    
      //document converter
      def rowToDoc: DataRow => Document = row => Document (
        "rowid" -> row.rowid,
        "measureid" ->  row.measureid,
        "state" ->  row.state,
        "county" ->  row.county,
        "year" ->  row.year,
        "value" ->  row.value
      )
      def docToRow: Document => DataRow = doc => DataRow (
        rowid = doc.getLong("rowid"),
        measureid = doc.getLong("measureid"),
        state = doc.getString("state"),
        county = doc.getString("county"),
        year = doc.getInteger("year"),
        value = doc.getInteger("value")
      )
      //setup context
      val mgoctx = StreamingInsert("testdb","members",rowToDoc)
      val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)
      val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)
      val sink = Sink.foreach[DataRow]{ r =>
        println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
      }
    
      //config jdbc drivers
      ConfigDBsWithEnv("dev").setup('h2)
      ConfigDBsWithEnv("dev").loadGlobalSettings()
    
      val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run()
    
      val mgoCtxPrint = MGOContext("testdb","members").setCommand(
        DocumentStream(filter = None))
    
      mongoStream(mgoCtxPrint).map(docToRow).to(sink).run()
    
    
    
      import com.datastax.driver.core._
      import cassandraengine._
      import CQLEngine._
      import org.mongodb.scala.model.Filters._
    
      //data row converter
      val cqlToDataRow = (rs: Row) => DataRow(
        rowid = rs.getLong("ROWID"),
        measureid = rs.getLong("MEASUREID"),
        state = rs.getString("STATENAME"),
        county = rs.getString("COUNTYNAME"),
        year = rs.getInt("REPORTYEAR"),
        value = rs.getInt("VALUE")
      )
    
     import org.bson.conversions._
      import org.mongodb.scala.model.Updates._
    
      //#init-session
      implicit val session = Cluster.builder
        .addContactPoint("127.0.0.1")
        .withPort(9042)
        .build
        .connect()
    
    
      //setup context
      val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)
      //construct source
      val cqlSource = cassandraStream(cqlCtx)
    
      def toFilter: DataRow => Bson = row => {
        and(equal("rowid",row.rowid), lt("value",10))
      }
      def toUpdate: DataRow => Bson = row => {
        set("value" , row.value * 10)
      }
      val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)
      val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)
      cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run()
    
    
      import org.bson.conversions._
      import org.mongodb.scala.model.Filters._
      def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10))
    
      val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)
      val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)
      val mgoCtxSrc = MGOContext("testdb","members").setCommand(
        DocumentStream(filter = None))
      mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run()
    
      import org.mongodb.scala.model.Sorts._
      val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))
      val mgoCtxShow = MGOContext("testdb","members").setCommand(
        DocumentStream(filter = None, andThen = Some(sortDsc)))
    
      mongoStream(mgoCtxShow).map(docToRow).to(sink).run()
    
      scala.io.StdIn.readLine()
    
      system.terminate()
    
    
    
    }
  • 相关阅读:
    kubernetes----资源控制器5
    kubernetes----二进制安装3
    kubernetes----自动化安装2
    kubernetes----基础1
    PC微信低版本限制登录怎么办?
    mysql排名次
    mysql中GROUP BY中报错
    scrapy框架操作
    python QQ与微信自动发消息
    python中调用js的编码问题
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8581280.html
Copyright © 2011-2022 走看看