zoukankan      html  css  js  c++  java
  • restapi(3)- MongoDBEngine : MongoDB Scala编程工具库

    最近刚好有同事在学习MongoDB,我们讨论过MongoDB应该置于服务器端然后通过web-service为客户端提供数据的上传下载服务。我们可以用上节讨论的respapi框架来实现针对MongoDB的CRUD操作。在谈到restapi之前我在这篇讨论先介绍一下MongoDB数据库操作的scala编程,因为与传统的SQL数据库操作编程有比较大的差别。

    在前面有关sdp (streaming-data-processor)系列的博文中有一段是关于MongoDBEngine的。刚好把这套工具的使用在这里介绍一下。

    MongoDBEngine是基于mongodb-scala-driver上开发的一套MongoDB数据库CRUD Scala编程工具,其主要功能可以从下面这三个函数中反映出来:

     def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] 
    
    
      // T => FindIterable  e.g List[Document]
     def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T]
    
    
     def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] 

    其中: mgoUpdate功能包括:insert,update,delete,replace ...

                mgoQuery: find,count,distinct ...

                mgoAdmin: dropCollection, createCollection ...

    首先需要注意的是它们的返回结果类型: DBOResult[T],实质上是 Future[Either[String,Option[T]]]

     type DBOError[A] = EitherT[Task,Throwable,A]
     type DBOResult[A] = OptionT[DBOError,A]

    看起来很复杂,实际容易解释:设计这个类型的初衷是针对数据库操作的,所以:

    1、异步操作,所以用Future (Task即Future, 如:Task.runToFuture)

    2、返回结果可能为空,所以用Option

    3、发生错误结果也为空,但需要知道空值是由错误产生的,所以用了Either

    把所有返回结果类型统一成DBOResult[T]是为了方便进行函数组合,如:

    for {
        a <- mgoQuery(...)
        _ <- mgoUpdate(a, ...)
        b <- mgoQuery(...)
    } yield b

    但另一方面也为写代码带来一些麻烦,如从结构中抽出运算结果值:

     mgoQuery[List[Document]](ctxFind).value.value.runToFuture {
          case Success(eold) => eold match {
            case Right(old) => old match {
              case Some(ld) => ld.map(toPO(_)).foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
     }

    是有些麻烦,不过能更详细的了解命令执行过程,而且是统一标准的写法(ctlr-c, ctlr-v 就可以了)。

    上面三个函数都有一个同样的MGOContext类型的入参数,这是一个命令类型:

      case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_QUERY,
                             action: Option[MGOCommands] = None,
                             actionOptions: Option[Any] = None,
                             actionTargets: Seq[String] = Nil
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))
    
        def toSomeProto = MGOProtoConversion.ctxToProto(this)
    
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
        def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
          MGOProtoConversion.ctxFromProto(proto)
      }

    可以看到MGOContext.action就是具体的操作命令。下面是一个mgoAdmin命令的示范:

      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
    
      import monix.execution.Scheduler.Implicits.global
      println(getResult(mgoAdmin(ctx).value.value.runToFuture))

    mgoUpdate示范:

      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      )
      println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))

    我们选择MongoDB的主要目的是因为它分布式特性,适合大数据模式。但同时MongoDB具备强大的query功能,与传统SQL数据库模式相近,甚至还可以用索引。虽然MongoDB不支持数据关系,但对于我们这样的传统SQL老兵还是必然之选。MongoDB支持逗点查询组合,如:

        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) 

    比如对查询结果进行排序,同时又抽选几个返回的字段可以写成:FindObservable(...).sort(...).project(...)。MongoEngine提供了一个ResultOptions类型:

     case class ResultOptions(
                                optType: FOD_TYPE,
                                bson: Option[Bson] = None,
                                value: Int = 0 ){
         def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bson.get)
            case  FOD_LIMIT        => find.limit(value)
            case  FOD_SKIP         => find.skip(value)
            case  FOD_PROJECTION   => find.projection(bson.get)
            case  FOD_SORT         => find.sort(bson.get)
            case  FOD_PARTIAL      => find.partial(value != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bson.get)
            case  FOD_MAX          => find.max(bson.get)
            case  FOD_MIN          => find.min(bson.get)
            case  FOD_RETURNKEY    => find.returnKey(value != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(value != 0)
    
          }
        }

    这个类型也是MGOContext类型的一个参数。 下面是一些用例:

      val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))
    
    
      val sort: Bson = (descending("ponum"))
      val proj: Bson = (and(include("ponum","podate")
                       ,include("vendor"),excludeId()))
      val resSort = ResultOptions(FOD_SORT,Some(sort))
      val resProj = ResultOptions(FOD_PROJECTION,Some(proj))
      val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))
    
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)))
      )

    下面是一个完整的例子:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    
    import scala.collection.JavaConverters._
    import com.mongodb.client.model._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    
    import scala.util._
    
    object TestMongoEngine extends App {
      import MGOEngine._
      import MGOHelpers._
      import MGOCommands._
      import MGOAdmins._
    
      // or provide custom MongoClientSettings
      val settings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
        .build()
      implicit val client: MongoClient = MongoClient(settings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
     // implicit val ec = system.dispatcher
    
      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
    
      import monix.execution.Scheduler.Implicits.global
      println(getResult(mgoAdmin(ctx).value.value.runToFuture))
    
    scala.io.StdIn.readLine()
    
      val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")
      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> mgoDate(2017,5,13),
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> mgoDate(2015,11,6),
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )
    
      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      )
      println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))
    
      scala.io.StdIn.readLine()
    
      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray],
                      handler: Option[MGOBlob]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl"),
          handler = mgoGetBlobOrNone(doc,"handler")
        )
      }
    
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
              .map { dc => toPODTL(dc)}
              .foreach { doc: PODTL =>
                print(s"==>Item: ${doc.item} ")
                print(s"price: ${doc.price} ")
                print(s"qty: ${doc.qty} ")
                doc.packing.foreach(pk => print(s"packing: ${pk} "))
                doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                println("")
              }
          case _ =>
        }
    
        po.handler match {
          case Some(blob) =>
            val fileName = s"/users/tiger/${po.ponum}.png"
            mgoBlobToFile(blob,fileName)
            println(s"picture saved to ${fileName}")
          case None => println("no picture provided")
        }
    
      }
    
      import org.mongodb.scala.model.Projections._
      import org.mongodb.scala.model.Filters._
      import org.mongodb.scala.model.Sorts._
      import org.mongodb.scala.bson.conversions._
      import org.mongodb.scala.bson.Document
    
    
      val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))
    
    
      val sort: Bson = (descending("ponum"))
      val proj: Bson = (and(include("ponum","podate")
                       ,include("vendor"),excludeId()))
      val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))
      val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))
      val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))
    
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)))
      )
    
      for {
        _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {
          case Success(eold) => eold match {
            case Right(old) => old match {
              case Some(ld) => ld.map(toPO(_)).foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eop) => eop match {
            case Right(op) => op match {
              case Some(p) => showPO(_)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eops) => eops match {
            case Right(ops) => ops match {
              case Some(lp) => lp.foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
      } yield()
    
    
      scala.io.StdIn.readLine()
    
    
      system.terminate()
    }

    运行程序后结果如下:

    Right(Some(The operation completed successfully))
    
    Right(Some(The operation completed successfully))
    
    po number: po18022002
    po date: 2015-12-06
    vendor: The Samsung compay
    no picture provided
    po number: po18012301
    po date: 2017-06-13
    vendor: The smartphone compay
    no picture provided
    -------------------------------
    -------------------------------
    po number: po18022002
    po date: 2015-12-06
    vendor: The Samsung compay
    ==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard 
    ==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days 
    ==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury 
    no picture provided
    -------------------------------

    以下是本次讨论涉及的全部源代码:

    build.sbt

    name := "dt-dal"
    
    version := "0.2"
    
    scalaVersion := "2.12.8"
    
    scalacOptions += "-Ypartial-unification"
    
    val akkaVersion = "2.5.23"
    val akkaHttpVersion = "10.1.8"
    
    libraryDependencies := Seq(
      // for scalikejdbc
      "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
      "com.h2database"  %  "h2" % "1.4.199",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.3.2",
      //for cassandra 3.6.0
      "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
      "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",
      //for mongodb 4.0
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
      "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
      "io.monix" %% "monix" % "3.0.0-RC3",
      "org.typelevel" %% "cats-core" % "2.0.0-M4"
    )

    converters/DBOResultType.scala

    package com.datatech.sdp.result
    
    import cats._
    import cats.data.EitherT
    import cats.data.OptionT
    import monix.eval.Task
    import cats.implicits._
    
    import scala.concurrent._
    
    import scala.collection.TraversableOnce
    
    object DBOResult {
    
    
      type DBOError[A] = EitherT[Task,Throwable,A]
      type DBOResult[A] = OptionT[DBOError,A]
    
      implicit def valueToDBOResult[A](a: A): DBOResult[A] =
             Applicative[DBOResult].pure(a)
      implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
             OptionT((o: Option[A]).pure[DBOError])
      implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
     //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
             OptionT.liftF(EitherT.fromEither[Task](e))
      }
      implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
           val task = Task.fromFuture[A](fut)
           val et = EitherT.liftF[Task,Throwable,A](task)
           OptionT.liftF(et)
      }
    
      implicit class DBOResultToTask[A](r: DBOResult[A]) {
        def toTask = r.value.value
      }
    
      implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
        def someValue: Option[A] = r match {
          case Left(err) => (None: Option[A])
          case Right(oa) => oa
        }
      }
    
      def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
        if (coll.isEmpty)
          optionToDBOResult(None: Option[C[A]])
        else
          optionToDBOResult(Some(coll): Option[C[A]])
    }

    filestream/FileStreaming.scala

    package com.datatech.sdp.file
    
    import java.io.{ByteArrayInputStream, InputStream}
    import java.nio.ByteBuffer
    import java.nio.file.Paths
    
    import akka.stream.Materializer
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    import akka.util._
    
    import scala.concurrent.Await
    import scala.concurrent.duration._
    
    object Streaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
    }

    logging/Log.scala

    package com.datatech.sdp.logging
    
    import org.slf4j.Logger
    
    /**
      * Logger which just wraps org.slf4j.Logger internally.
      *
      * @param logger logger
      */
    class Log(logger: Logger) {
    
      // use var consciously to enable squeezing later
      var isDebugEnabled: Boolean = logger.isDebugEnabled
      var isInfoEnabled: Boolean = logger.isInfoEnabled
      var isWarnEnabled: Boolean = logger.isWarnEnabled
      var isErrorEnabled: Boolean = logger.isErrorEnabled
      var isTraceEnabled: Boolean = logger.isTraceEnabled
    
      def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
        level match {
          case 'debug | 'DEBUG => debug(msg)
          case 'info | 'INFO => info(msg)
          case 'warn | 'WARN => warn(msg)
          case 'error | 'ERROR => error(msg)
          case 'trace | 'TRACE => trace(msg)
          case _ => // nothing to do
        }
      }
    
      var stepOn: Boolean = false
    
      def step(msg: => String): Unit = {
        if(stepOn)
          logger.info("
    ****** {} ******
    ",msg)
      }
    
      def trace(msg: => String): Unit = {
        if (isTraceEnabled && logger.isTraceEnabled) {
          logger.trace(msg)
        }
      }
    
      def trace(msg: => String, e: Throwable): Unit = {
        if (isTraceEnabled && logger.isTraceEnabled) {
          logger.trace(msg, e)
        }
      }
    
      def debug(msg: => String): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg)
        }
      }
    
      def debug(msg: => String, e: Throwable): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg, e)
        }
      }
    
      def info(msg: => String): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg)
        }
      }
    
      def info(msg: => String, e: Throwable): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg, e)
        }
      }
    
      def warn(msg: => String): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg)
        }
      }
    
      def warn(msg: => String, e: Throwable): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg, e)
        }
      }
    
      def error(msg: => String): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg)
        }
      }
    
      def error(msg: => String, e: Throwable): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg, e)
        }
      }
    
    }

    logging/LogSupport.scala

    package com.datatech.sdp.logging
    
    import org.slf4j.LoggerFactory
    
    trait LogSupport {
    
      /**
        * Logger
        */
      protected val log = new Log(LoggerFactory.getLogger(this.getClass))
    
    }

    mgo/engine/ObservableToPublisher.scala

    package com.datatech.sdp.mongo.engine
    
    import java.util.concurrent.atomic.AtomicBoolean
    
    import org.mongodb.{scala => mongoDB}
    import org.{reactivestreams => rxStreams}
    
    final case class ObservableToPublisher[T](observable: mongoDB.Observable[T])
      extends rxStreams.Publisher[T] {
      def subscribe(subscriber: rxStreams.Subscriber[_ >: T]): Unit =
        observable.subscribe(new mongoDB.Observer[T]() {
          override def onSubscribe(subscription: mongoDB.Subscription): Unit =
            subscriber.onSubscribe(new rxStreams.Subscription() {
              private final val cancelled: AtomicBoolean = new AtomicBoolean
    
              override def request(n: Long): Unit =
                if (!subscription.isUnsubscribed && !cancelled.get() && n < 1) {
                  subscriber.onError(
                    new IllegalArgumentException(
                      s"Demand from publisher should be a positive amount. Current amount is:$n"
                    )
                  )
                } else {
                  subscription.request(n)
                }
    
              override def cancel(): Unit =
                if (!cancelled.getAndSet(true)) subscription.unsubscribe()
            })
    
          def onNext(result: T): Unit = subscriber.onNext(result)
    
          def onError(e: Throwable): Unit = subscriber.onError(e)
    
          def onComplete(): Unit = subscriber.onComplete()
        })
    }

    mgo/engine/MongoDBEngine.scala

    package com.datatech.sdp.mongo.engine
    
    import java.text.SimpleDateFormat
    import java.util.Calendar
    
    import akka.NotUsed
    import akka.stream.Materializer
    import akka.stream.alpakka.mongodb.scaladsl._
    import akka.stream.scaladsl.{Flow, Source}
    import org.bson.conversions.Bson
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.mongodb.scala.bson.{BsonArray, BsonBinary}
    import org.mongodb.scala.model._
    import org.mongodb.scala.{MongoClient, _}
    import com.datatech.sdp
    import sdp.file.Streaming._
    import sdp.logging.LogSupport
    
    import scala.collection.JavaConverters._
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object MGOClasses {
      type MGO_ACTION_TYPE = Int
      object MGO_ACTION_TYPE {
        val MGO_QUERY = 0
        val MGO_UPDATE = 1
        val MGO_ADMIN = 2
      }
    
      /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
    
      type FOD_TYPE = Int
      object FOD_TYPE {
        val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
        val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
        val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
        val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
        val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
        //Sets a document describing the fields to return for all matching documents
        val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
        val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
        //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
        val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
        //Sets the cursor type
        val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
        //Sets the hint for which index to use. A null value means no hint is set
        val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
        //Sets the exclusive upper bound for a specific index. A null value means no max is set
        val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
        //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
        val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
        //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
        val FOD_SHOWRECORDID = 12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
        //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents
      }
      case class ResultOptions(
                                optType: FOD_TYPE,
                                bson: Option[Bson] = None,
                                value: Int = 0 ){
        import FOD_TYPE._
         def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bson.get)
            case  FOD_LIMIT        => find.limit(value)
            case  FOD_SKIP         => find.skip(value)
            case  FOD_PROJECTION   => find.projection(bson.get)
            case  FOD_SORT         => find.sort(bson.get)
            case  FOD_PARTIAL      => find.partial(value != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bson.get)
            case  FOD_MAX          => find.max(bson.get)
            case  FOD_MIN          => find.min(bson.get)
            case  FOD_RETURNKEY    => find.returnKey(value != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(value != 0)
    
          }
        }
      }
    
      trait MGOCommands
    
      object MGOCommands {
    
        case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
    
        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
                           firstOnly: Boolean = false) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }
    
      case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_ACTION_TYPE.MGO_QUERY,
                             action: Option[MGOCommands] = None,
                             actionOptions: Option[Any] = None,
                             actionTargets: Seq[String] = Nil
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))
    
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
      }
    
      case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
        ctxs =>
        def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
        def appendContext(ctx: MGOContext): MGOBatContext =
          ctxs.copy(contexts = contexts :+ ctx)
      }
    
      object MGOBatContext {
        def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
      }
    
      type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)
    
      def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
        (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
    }
    
    
    object MGOEngine extends LogSupport {
    
      import MGOClasses._
      import MGOAdmins._
      import MGOCommands._
      import sdp.result.DBOResult._
      import com.mongodb.reactivestreams.client.MongoClients
    
      object TxUpdateMode {
        private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
                  implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
          log.info(s"mgoTxUpdate> calling ...")
          observable.map(clientSession => {
    
            val transactionOptions =
              TransactionOptions.builder()
                .readConcern(ReadConcern.SNAPSHOT)
                .writeConcern(WriteConcern.MAJORITY).build()
    
            clientSession.startTransaction(transactionOptions)
    /*
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
            Await.ready(fut, 3 seconds) */
    
            ctxs.contexts.foreach { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
            clientSession
          })
        }
    
        private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"commitAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
              log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
              commitAndRetry(observable)
            }
            case e: Exception => {
              log.error(s"commitAndRetry> Exception during commit ...: $e")
              throw e
            }
          })
        }
    
        private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"runTransactionAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
              log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
              runTransactionAndRetry(observable)
            }
          })
        }
    
        def mgoTxBatch(ctxs: MGOBatContext)(
                implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
    
          log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")
    
          val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
          val commitTransactionObservable: SingleObservable[Completed] =
            updateObservable.flatMap(clientSession => clientSession.commitTransaction())
          val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
    
          runTransactionAndRetry(commitAndRetryObservable)
    
          valueToDBOResult(Completed())
    
        }
      }
    
    
      def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
        log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")
        if (ctxs.tx) {
            TxUpdateMode.mgoTxBatch(ctxs)
          } else {
    /*
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
            Await.ready(fut, 3 seconds)
            FastFastFuture.successful(new Completed) */
            ctxs.contexts.foreach { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
             valueToDBOResult(Completed())
          }
    
      }
    
      def mongoStream(ctx: MGOContext)(
        implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
    
        log.info(s"mongoStream>  MGOContext: ${ctx}")
    
        def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
          rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mongoStream> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        try {
          ctx.action.get match {
            case Find(None, Nil, false) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find()))
            case Find(None, Nil, true) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find().first()))
            case Find(Some(filter), Nil, false) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find(filter)))
            case Find(Some(filter), Nil, true) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find(filter).first()))
            case Find(None, sro, _) => //FindObservable
              val next = toResultOption(sro)
              MongoSource(ObservableToPublisher(next(coll.find[Document]())))
            case Find(Some(filter), sro, _) => //FindObservable
              val next = toResultOption(sro)
              MongoSource(ObservableToPublisher(next(coll.find[Document](filter))))
            case _ =>
              log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
              throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
    
          }
        }
        catch { case e: Exception =>
          log.error(s"mongoStream> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
        }
    
      }
    
    
      // T => FindIterable  e.g List[Document]
      def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
        log.info(s"mgoQuery>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
    
        def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
          rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
    
    
        if ( ctx.action == None) {
          log.error(s"mgoQuery> uery action cannot be null!")
          Left(new IllegalArgumentException("query action cannot be null!"))
        }
        try {
          ctx.action.get match {
            /* count */
            case Count(Some(filter), Some(opt)) => //SingleObservable
              coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
                .toFuture().asInstanceOf[Future[T]]
            case Count(Some(filter), None) => //SingleObservable
              coll.countDocuments(filter).toFuture()
                .asInstanceOf[Future[T]]
            case Count(None, None) => //SingleObservable
              coll.countDocuments().toFuture()
                .asInstanceOf[Future[T]]
            /* distinct */
            case Distict(field, Some(filter)) => //DistinctObservable
              coll.distinct(field, filter).toFuture()
                .asInstanceOf[Future[T]]
            case Distict(field, None) => //DistinctObservable
              coll.distinct((field)).toFuture()
                .asInstanceOf[Future[T]]
            /* find */
            case Find(None, Nil, false) => //FindObservable
              if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
              else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(None, Nil, true) => //FindObservable
              if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
              else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
            case Find(Some(filter), Nil, false) => //FindObservable
              if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
              else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(Some(filter), Nil, true) => //FindObservable
              if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
              else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
            case Find(None, sro, _) => //FindObservable
              val next = toResultOption(sro)
              if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
              else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(Some(filter), sro, _) => //FindObservable
              val next = toResultOption(sro)
              if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
              else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            /* aggregate AggregateObservable*/
            case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
            /* mapReduce MapReduceObservable*/
            case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
            /* list collection */
            case ListCollection(dbName) => //ListConllectionObservable
              client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
    
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoQuery> runtime error: ${e.getMessage}")
          Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
        }
      }
      //T => Completed, result.UpdateResult, result.DeleteResult
      def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
        try {
          mgoUpdateObservable[T](ctx).toFuture()
        }
        catch { case e: Exception =>
          log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
          Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
        }
    
      def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
        log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoUpdateObservable> uery action cannot be null!")
          throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
        }
        try {
          ctx.action.get match {
            /* insert */
            case Insert(docs, Some(opt)) => //SingleObservable[Completed]
              if (docs.size > 1)
                coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
              else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
            case Insert(docs, None) => //SingleObservable
              if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
              else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
            /* delete */
            case Delete(filter, None, onlyOne) => //SingleObservable
              if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
              else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
            case Delete(filter, Some(opt), onlyOne) => //SingleObservable
              if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
              else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
            /* replace */
            case Replace(filter, replacement, None) => //SingleObservable
              coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
            case Replace(filter, replacement, Some(opt)) => //SingleObservable
              coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
            /* update */
            case Update(filter, update, None, onlyOne) => //SingleObservable
              if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
              else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
            case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
              if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
              else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
            /* bulkWrite */
            case BulkWrite(commands, None) => //SingleObservable
              coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
            case BulkWrite(commands, Some(opt)) => //SingleObservable
              coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
        }
      }
    
      def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
        log.info(s"mgoAdmin>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoAdmin> uery action cannot be null!")
          Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
        }
        try {
          ctx.action.get match {
            /* drop collection */
            case DropCollection(collName) => //SingleObservable
              val coll = db.getCollection(collName)
              coll.drop().toFuture()
            /* create collection */
            case CreateCollection(collName, None) => //SingleObservable
              db.createCollection(collName).toFuture()
            case CreateCollection(collName, Some(opt)) => //SingleObservable
              db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
            /* list collection
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
            */
            /* create view */
            case CreateView(viewName, viewOn, pline, None) => //SingleObservable
              db.createView(viewName, viewOn, pline).toFuture()
            case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
              db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
            /* create index */
            case CreateIndex(key, None) => //SingleObservable
              coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] //   asInstanceOf[SingleObservable[Completed]]
            case CreateIndex(key, Some(opt)) => //SingleObservable
              coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
            /* drop index */
            case DropIndexByName(indexName, None) => //SingleObservable
              coll.dropIndex(indexName).toFuture()
            case DropIndexByName(indexName, Some(opt)) => //SingleObservable
              coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
            case DropIndexByKey(key, None) => //SingleObservable
              coll.dropIndex(key).toFuture()
            case DropIndexByKey(key, Some(opt)) => //SingleObservable
              coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
            case DropAllIndexes(None) => //SingleObservable
              coll.dropIndexes().toFuture()
            case DropAllIndexes(Some(opt)) => //SingleObservable
              coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
        }
    
      }
    
    }
    
    
    object MongoActionStream {
    
      import MGOClasses._
    
      case class StreamingInsert[A](dbName: String,
                                    collName: String,
                                    converter: A => Document,
                                    parallelism: Int = 1
                                   ) extends MGOCommands
    
      case class StreamingDelete[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
      case class StreamingUpdate[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    toUpdate: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
    
      case class InsertAction[A](ctx: StreamingInsert[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
          Flow[A].map(ctx.converter)
            .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
      }
    
      case class UpdateAction[A](ctx: StreamingUpdate[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
      }
    
    
      case class DeleteAction[A](ctx: StreamingDelete[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
      }
    
    }
    
    object MGOHelpers {
    
      implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
        override val converter: (Document) => String = (doc) => doc.toJson
      }
    
      implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
        override val converter: (C) => String = (doc) => doc.toString
      }
    
      trait ImplicitObservable[C] {
        val observable: Observable[C]
        val converter: (C) => String
    
        def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    
        def headResult() = Await.result(observable.head(), 10 seconds)
    
        def printResults(initial: String = ""): Unit = {
          if (initial.length > 0) print(initial)
          results().foreach(res => println(converter(res)))
        }
    
        def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
      }
    
      def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
        Await.result(fut, timeOut)
      }
    
      def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
        Await.result(fut, timeOut)
      }
    
      import monix.eval.Task
      import monix.execution.Scheduler.Implicits.global
    
      final class FutureToTask[A](x: => Future[A]) {
        def asTask: Task[A] = Task.deferFuture[A](x)
      }
    
      final class TaskToFuture[A](x: => Task[A]) {
        def asFuture: Future[A] = x.runToFuture
      }
    
    }

    TestMongoEngine.scala

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    
    import scala.collection.JavaConverters._
    import com.mongodb.client.model._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    
    import scala.util._
    
    object TestMongoEngine extends App {
      import MGOEngine._
      import MGOHelpers._
      import MGOCommands._
      import MGOAdmins._
    
      // or provide custom MongoClientSettings
      val settings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
        .build()
      implicit val client: MongoClient = MongoClient(settings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
     // implicit val ec = system.dispatcher
    
      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
    
      import monix.execution.Scheduler.Implicits.global
      println(getResult(mgoAdmin(ctx).value.value.runToFuture))
    
    scala.io.StdIn.readLine()
    
      val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")
      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> mgoDate(2017,5,13),
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> mgoDate(2015,11,6),
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )
    
      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      )
      println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))
    
      scala.io.StdIn.readLine()
    
      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray],
                      handler: Option[MGOBlob]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl"),
          handler = mgoGetBlobOrNone(doc,"handler")
        )
      }
    
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
              .map { dc => toPODTL(dc)}
              .foreach { doc: PODTL =>
                print(s"==>Item: ${doc.item} ")
                print(s"price: ${doc.price} ")
                print(s"qty: ${doc.qty} ")
                doc.packing.foreach(pk => print(s"packing: ${pk} "))
                doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                println("")
              }
          case _ =>
        }
    
        po.handler match {
          case Some(blob) =>
            val fileName = s"/users/tiger/${po.ponum}.png"
            mgoBlobToFile(blob,fileName)
            println(s"picture saved to ${fileName}")
          case None => println("no picture provided")
        }
    
      }
    
      import org.mongodb.scala.model.Projections._
      import org.mongodb.scala.model.Filters._
      import org.mongodb.scala.model.Sorts._
      import org.mongodb.scala.bson.conversions._
      import org.mongodb.scala.bson.Document
    
    
      val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))
    
    
      val sort: Bson = (descending("ponum"))
      val proj: Bson = (and(include("ponum","podate")
                       ,include("vendor"),excludeId()))
      val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))
      val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))
      val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))
    
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)))
      )
    
      for {
        _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {
          case Success(eold) => eold match {
            case Right(old) => old match {
              case Some(ld) => ld.map(toPO(_)).foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eop) => eop match {
            case Right(op) => op match {
              case Some(p) => showPO(_)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eops) => eops match {
            case Right(ops) => ops match {
              case Some(lp) => lp.foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
      } yield()
    
    
      scala.io.StdIn.readLine()
    
    
      system.terminate()
    }
  • 相关阅读:
    BZOJ 1040 (ZJOI 2008) 骑士
    BZOJ 1037 (ZJOI 2008) 生日聚会
    ZJOI 2006 物流运输 bzoj1003
    ZJOI 2006 物流运输 bzoj1003
    NOI2001 炮兵阵地 洛谷2704
    NOI2001 炮兵阵地 洛谷2704
    JLOI 2013 卡牌游戏 bzoj3191
    JLOI 2013 卡牌游戏 bzoj3191
    Noip 2012 day2t1 同余方程
    bzoj 1191 [HNOI2006]超级英雄Hero——二分图匹配
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/11301362.html
Copyright © 2011-2022 走看看