zoukankan      html  css  js  c++  java
  • restapi(3)- MongoDBEngine : MongoDB Scala编程工具库

    最近刚好有同事在学习MongoDB,我们讨论过MongoDB应该置于服务器端然后通过web-service为客户端提供数据的上传下载服务。我们可以用上节讨论的respapi框架来实现针对MongoDB的CRUD操作。在谈到restapi之前我在这篇讨论先介绍一下MongoDB数据库操作的scala编程,因为与传统的SQL数据库操作编程有比较大的差别。

    在前面有关sdp (streaming-data-processor)系列的博文中有一段是关于MongoDBEngine的。刚好把这套工具的使用在这里介绍一下。

    MongoDBEngine是基于mongodb-scala-driver上开发的一套MongoDB数据库CRUD Scala编程工具,其主要功能可以从下面这三个函数中反映出来:

     def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] 
    
    
      // T => FindIterable  e.g List[Document]
     def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T]
    
    
     def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] 

    其中: mgoUpdate功能包括:insert,update,delete,replace ...

                mgoQuery: find,count,distinct ...

                mgoAdmin: dropCollection, createCollection ...

    首先需要注意的是它们的返回结果类型: DBOResult[T],实质上是 Future[Either[String,Option[T]]]

     type DBOError[A] = EitherT[Task,Throwable,A]
     type DBOResult[A] = OptionT[DBOError,A]

    看起来很复杂,实际容易解释:设计这个类型的初衷是针对数据库操作的,所以:

    1、异步操作,所以用Future (Task即Future, 如:Task.runToFuture)

    2、返回结果可能为空,所以用Option

    3、发生错误结果也为空,但需要知道空值是由错误产生的,所以用了Either

    把所有返回结果类型统一成DBOResult[T]是为了方便进行函数组合,如:

    for {
        a <- mgoQuery(...)
        _ <- mgoUpdate(a, ...)
        b <- mgoQuery(...)
    } yield b

    但另一方面也为写代码带来一些麻烦,如从结构中抽出运算结果值:

     mgoQuery[List[Document]](ctxFind).value.value.runToFuture {
          case Success(eold) => eold match {
            case Right(old) => old match {
              case Some(ld) => ld.map(toPO(_)).foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
     }

    是有些麻烦,不过能更详细的了解命令执行过程,而且是统一标准的写法(ctlr-c, ctlr-v 就可以了)。

    上面三个函数都有一个同样的MGOContext类型的入参数,这是一个命令类型:

      case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_QUERY,
                             action: Option[MGOCommands] = None,
                             actionOptions: Option[Any] = None,
                             actionTargets: Seq[String] = Nil
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))
    
        def toSomeProto = MGOProtoConversion.ctxToProto(this)
    
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
        def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
          MGOProtoConversion.ctxFromProto(proto)
      }

    可以看到MGOContext.action就是具体的操作命令。下面是一个mgoAdmin命令的示范:

      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
    
      import monix.execution.Scheduler.Implicits.global
      println(getResult(mgoAdmin(ctx).value.value.runToFuture))

    mgoUpdate示范:

      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      )
      println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))

    我们选择MongoDB的主要目的是因为它分布式特性,适合大数据模式。但同时MongoDB具备强大的query功能,与传统SQL数据库模式相近,甚至还可以用索引。虽然MongoDB不支持数据关系,但对于我们这样的传统SQL老兵还是必然之选。MongoDB支持逗点查询组合,如:

        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) 

    比如对查询结果进行排序,同时又抽选几个返回的字段可以写成:FindObservable(...).sort(...).project(...)。MongoEngine提供了一个ResultOptions类型:

     case class ResultOptions(
                                optType: FOD_TYPE,
                                bson: Option[Bson] = None,
                                value: Int = 0 ){
         def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bson.get)
            case  FOD_LIMIT        => find.limit(value)
            case  FOD_SKIP         => find.skip(value)
            case  FOD_PROJECTION   => find.projection(bson.get)
            case  FOD_SORT         => find.sort(bson.get)
            case  FOD_PARTIAL      => find.partial(value != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bson.get)
            case  FOD_MAX          => find.max(bson.get)
            case  FOD_MIN          => find.min(bson.get)
            case  FOD_RETURNKEY    => find.returnKey(value != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(value != 0)
    
          }
        }

    这个类型也是MGOContext类型的一个参数。 下面是一些用例:

      val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))
    
    
      val sort: Bson = (descending("ponum"))
      val proj: Bson = (and(include("ponum","podate")
                       ,include("vendor"),excludeId()))
      val resSort = ResultOptions(FOD_SORT,Some(sort))
      val resProj = ResultOptions(FOD_PROJECTION,Some(proj))
      val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))
    
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)))
      )

    下面是一个完整的例子:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    
    import scala.collection.JavaConverters._
    import com.mongodb.client.model._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    
    import scala.util._
    
    object TestMongoEngine extends App {
      import MGOEngine._
      import MGOHelpers._
      import MGOCommands._
      import MGOAdmins._
    
      // or provide custom MongoClientSettings
      val settings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
        .build()
      implicit val client: MongoClient = MongoClient(settings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
     // implicit val ec = system.dispatcher
    
      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
    
      import monix.execution.Scheduler.Implicits.global
      println(getResult(mgoAdmin(ctx).value.value.runToFuture))
    
    scala.io.StdIn.readLine()
    
      val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")
      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> mgoDate(2017,5,13),
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> mgoDate(2015,11,6),
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )
    
      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      )
      println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))
    
      scala.io.StdIn.readLine()
    
      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray],
                      handler: Option[MGOBlob]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl"),
          handler = mgoGetBlobOrNone(doc,"handler")
        )
      }
    
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
              .map { dc => toPODTL(dc)}
              .foreach { doc: PODTL =>
                print(s"==>Item: ${doc.item} ")
                print(s"price: ${doc.price} ")
                print(s"qty: ${doc.qty} ")
                doc.packing.foreach(pk => print(s"packing: ${pk} "))
                doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                println("")
              }
          case _ =>
        }
    
        po.handler match {
          case Some(blob) =>
            val fileName = s"/users/tiger/${po.ponum}.png"
            mgoBlobToFile(blob,fileName)
            println(s"picture saved to ${fileName}")
          case None => println("no picture provided")
        }
    
      }
    
      import org.mongodb.scala.model.Projections._
      import org.mongodb.scala.model.Filters._
      import org.mongodb.scala.model.Sorts._
      import org.mongodb.scala.bson.conversions._
      import org.mongodb.scala.bson.Document
    
    
      val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))
    
    
      val sort: Bson = (descending("ponum"))
      val proj: Bson = (and(include("ponum","podate")
                       ,include("vendor"),excludeId()))
      val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))
      val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))
      val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))
    
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)))
      )
    
      for {
        _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {
          case Success(eold) => eold match {
            case Right(old) => old match {
              case Some(ld) => ld.map(toPO(_)).foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eop) => eop match {
            case Right(op) => op match {
              case Some(p) => showPO(_)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eops) => eops match {
            case Right(ops) => ops match {
              case Some(lp) => lp.foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
      } yield()
    
    
      scala.io.StdIn.readLine()
    
    
      system.terminate()
    }

    运行程序后结果如下:

    Right(Some(The operation completed successfully))
    
    Right(Some(The operation completed successfully))
    
    po number: po18022002
    po date: 2015-12-06
    vendor: The Samsung compay
    no picture provided
    po number: po18012301
    po date: 2017-06-13
    vendor: The smartphone compay
    no picture provided
    -------------------------------
    -------------------------------
    po number: po18022002
    po date: 2015-12-06
    vendor: The Samsung compay
    ==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard 
    ==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days 
    ==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury 
    no picture provided
    -------------------------------

    以下是本次讨论涉及的全部源代码:

    build.sbt

    name := "dt-dal"
    
    version := "0.2"
    
    scalaVersion := "2.12.8"
    
    scalacOptions += "-Ypartial-unification"
    
    val akkaVersion = "2.5.23"
    val akkaHttpVersion = "10.1.8"
    
    libraryDependencies := Seq(
      // for scalikejdbc
      "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
      "com.h2database"  %  "h2" % "1.4.199",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.3.2",
      //for cassandra 3.6.0
      "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
      "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",
      //for mongodb 4.0
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
      "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
      "io.monix" %% "monix" % "3.0.0-RC3",
      "org.typelevel" %% "cats-core" % "2.0.0-M4"
    )

    converters/DBOResultType.scala

    package com.datatech.sdp.result
    
    import cats._
    import cats.data.EitherT
    import cats.data.OptionT
    import monix.eval.Task
    import cats.implicits._
    
    import scala.concurrent._
    
    import scala.collection.TraversableOnce
    
    object DBOResult {
    
    
      type DBOError[A] = EitherT[Task,Throwable,A]
      type DBOResult[A] = OptionT[DBOError,A]
    
      implicit def valueToDBOResult[A](a: A): DBOResult[A] =
             Applicative[DBOResult].pure(a)
      implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
             OptionT((o: Option[A]).pure[DBOError])
      implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
     //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
             OptionT.liftF(EitherT.fromEither[Task](e))
      }
      implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
           val task = Task.fromFuture[A](fut)
           val et = EitherT.liftF[Task,Throwable,A](task)
           OptionT.liftF(et)
      }
    
      implicit class DBOResultToTask[A](r: DBOResult[A]) {
        def toTask = r.value.value
      }
    
      implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
        def someValue: Option[A] = r match {
          case Left(err) => (None: Option[A])
          case Right(oa) => oa
        }
      }
    
      def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
        if (coll.isEmpty)
          optionToDBOResult(None: Option[C[A]])
        else
          optionToDBOResult(Some(coll): Option[C[A]])
    }

    filestream/FileStreaming.scala

    package com.datatech.sdp.file
    
    import java.io.{ByteArrayInputStream, InputStream}
    import java.nio.ByteBuffer
    import java.nio.file.Paths
    
    import akka.stream.Materializer
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    import akka.util._
    
    import scala.concurrent.Await
    import scala.concurrent.duration._
    
    object Streaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
    }

    logging/Log.scala

    package com.datatech.sdp.logging
    
    import org.slf4j.Logger
    
    /**
      * Logger which just wraps org.slf4j.Logger internally.
      *
      * @param logger logger
      */
    class Log(logger: Logger) {
    
      // use var consciously to enable squeezing later
      var isDebugEnabled: Boolean = logger.isDebugEnabled
      var isInfoEnabled: Boolean = logger.isInfoEnabled
      var isWarnEnabled: Boolean = logger.isWarnEnabled
      var isErrorEnabled: Boolean = logger.isErrorEnabled
      var isTraceEnabled: Boolean = logger.isTraceEnabled
    
      def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
        level match {
          case 'debug | 'DEBUG => debug(msg)
          case 'info | 'INFO => info(msg)
          case 'warn | 'WARN => warn(msg)
          case 'error | 'ERROR => error(msg)
          case 'trace | 'TRACE => trace(msg)
          case _ => // nothing to do
        }
      }
    
      var stepOn: Boolean = false
    
      def step(msg: => String): Unit = {
        if(stepOn)
          logger.info("
    ****** {} ******
    ",msg)
      }
    
      def trace(msg: => String): Unit = {
        if (isTraceEnabled && logger.isTraceEnabled) {
          logger.trace(msg)
        }
      }
    
      def trace(msg: => String, e: Throwable): Unit = {
        if (isTraceEnabled && logger.isTraceEnabled) {
          logger.trace(msg, e)
        }
      }
    
      def debug(msg: => String): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg)
        }
      }
    
      def debug(msg: => String, e: Throwable): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg, e)
        }
      }
    
      def info(msg: => String): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg)
        }
      }
    
      def info(msg: => String, e: Throwable): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg, e)
        }
      }
    
      def warn(msg: => String): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg)
        }
      }
    
      def warn(msg: => String, e: Throwable): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg, e)
        }
      }
    
      def error(msg: => String): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg)
        }
      }
    
      def error(msg: => String, e: Throwable): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg, e)
        }
      }
    
    }

    logging/LogSupport.scala

    package com.datatech.sdp.logging
    
    import org.slf4j.LoggerFactory
    
    trait LogSupport {
    
      /**
        * Logger
        */
      protected val log = new Log(LoggerFactory.getLogger(this.getClass))
    
    }

    mgo/engine/ObservableToPublisher.scala

    package com.datatech.sdp.mongo.engine
    
    import java.util.concurrent.atomic.AtomicBoolean
    
    import org.mongodb.{scala => mongoDB}
    import org.{reactivestreams => rxStreams}
    
    final case class ObservableToPublisher[T](observable: mongoDB.Observable[T])
      extends rxStreams.Publisher[T] {
      def subscribe(subscriber: rxStreams.Subscriber[_ >: T]): Unit =
        observable.subscribe(new mongoDB.Observer[T]() {
          override def onSubscribe(subscription: mongoDB.Subscription): Unit =
            subscriber.onSubscribe(new rxStreams.Subscription() {
              private final val cancelled: AtomicBoolean = new AtomicBoolean
    
              override def request(n: Long): Unit =
                if (!subscription.isUnsubscribed && !cancelled.get() && n < 1) {
                  subscriber.onError(
                    new IllegalArgumentException(
                      s"Demand from publisher should be a positive amount. Current amount is:$n"
                    )
                  )
                } else {
                  subscription.request(n)
                }
    
              override def cancel(): Unit =
                if (!cancelled.getAndSet(true)) subscription.unsubscribe()
            })
    
          def onNext(result: T): Unit = subscriber.onNext(result)
    
          def onError(e: Throwable): Unit = subscriber.onError(e)
    
          def onComplete(): Unit = subscriber.onComplete()
        })
    }

    mgo/engine/MongoDBEngine.scala

    package com.datatech.sdp.mongo.engine
    
    import java.text.SimpleDateFormat
    import java.util.Calendar
    
    import akka.NotUsed
    import akka.stream.Materializer
    import akka.stream.alpakka.mongodb.scaladsl._
    import akka.stream.scaladsl.{Flow, Source}
    import org.bson.conversions.Bson
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.mongodb.scala.bson.{BsonArray, BsonBinary}
    import org.mongodb.scala.model._
    import org.mongodb.scala.{MongoClient, _}
    import com.datatech.sdp
    import sdp.file.Streaming._
    import sdp.logging.LogSupport
    
    import scala.collection.JavaConverters._
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object MGOClasses {
      type MGO_ACTION_TYPE = Int
      object MGO_ACTION_TYPE {
        val MGO_QUERY = 0
        val MGO_UPDATE = 1
        val MGO_ADMIN = 2
      }
    
      /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
    
      type FOD_TYPE = Int
      object FOD_TYPE {
        val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
        val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
        val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
        val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
        val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
        //Sets a document describing the fields to return for all matching documents
        val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
        val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
        //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
        val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
        //Sets the cursor type
        val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
        //Sets the hint for which index to use. A null value means no hint is set
        val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
        //Sets the exclusive upper bound for a specific index. A null value means no max is set
        val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
        //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
        val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
        //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
        val FOD_SHOWRECORDID = 12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
        //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents
      }
      case class ResultOptions(
                                optType: FOD_TYPE,
                                bson: Option[Bson] = None,
                                value: Int = 0 ){
        import FOD_TYPE._
         def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bson.get)
            case  FOD_LIMIT        => find.limit(value)
            case  FOD_SKIP         => find.skip(value)
            case  FOD_PROJECTION   => find.projection(bson.get)
            case  FOD_SORT         => find.sort(bson.get)
            case  FOD_PARTIAL      => find.partial(value != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bson.get)
            case  FOD_MAX          => find.max(bson.get)
            case  FOD_MIN          => find.min(bson.get)
            case  FOD_RETURNKEY    => find.returnKey(value != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(value != 0)
    
          }
        }
      }
    
      trait MGOCommands
    
      object MGOCommands {
    
        case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
    
        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
                           firstOnly: Boolean = false) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }
    
      case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_ACTION_TYPE.MGO_QUERY,
                             action: Option[MGOCommands] = None,
                             actionOptions: Option[Any] = None,
                             actionTargets: Seq[String] = Nil
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))
    
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
      }
    
      case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
        ctxs =>
        def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
        def appendContext(ctx: MGOContext): MGOBatContext =
          ctxs.copy(contexts = contexts :+ ctx)
      }
    
      object MGOBatContext {
        def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
      }
    
      type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)
    
      def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
        (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
    }
    
    
    object MGOEngine extends LogSupport {
    
      import MGOClasses._
      import MGOAdmins._
      import MGOCommands._
      import sdp.result.DBOResult._
      import com.mongodb.reactivestreams.client.MongoClients
    
      object TxUpdateMode {
        private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
                  implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
          log.info(s"mgoTxUpdate> calling ...")
          observable.map(clientSession => {
    
            val transactionOptions =
              TransactionOptions.builder()
                .readConcern(ReadConcern.SNAPSHOT)
                .writeConcern(WriteConcern.MAJORITY).build()
    
            clientSession.startTransaction(transactionOptions)
    /*
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
            Await.ready(fut, 3 seconds) */
    
            ctxs.contexts.foreach { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
            clientSession
          })
        }
    
        private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"commitAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
              log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
              commitAndRetry(observable)
            }
            case e: Exception => {
              log.error(s"commitAndRetry> Exception during commit ...: $e")
              throw e
            }
          })
        }
    
        private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"runTransactionAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
              log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
              runTransactionAndRetry(observable)
            }
          })
        }
    
        def mgoTxBatch(ctxs: MGOBatContext)(
                implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
    
          log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")
    
          val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
          val commitTransactionObservable: SingleObservable[Completed] =
            updateObservable.flatMap(clientSession => clientSession.commitTransaction())
          val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
    
          runTransactionAndRetry(commitAndRetryObservable)
    
          valueToDBOResult(Completed())
    
        }
      }
    
    
      def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
        log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")
        if (ctxs.tx) {
            TxUpdateMode.mgoTxBatch(ctxs)
          } else {
    /*
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
            Await.ready(fut, 3 seconds)
            FastFastFuture.successful(new Completed) */
            ctxs.contexts.foreach { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
             valueToDBOResult(Completed())
          }
    
      }
    
      def mongoStream(ctx: MGOContext)(
        implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
    
        log.info(s"mongoStream>  MGOContext: ${ctx}")
    
        def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
          rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mongoStream> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        try {
          ctx.action.get match {
            case Find(None, Nil, false) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find()))
            case Find(None, Nil, true) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find().first()))
            case Find(Some(filter), Nil, false) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find(filter)))
            case Find(Some(filter), Nil, true) => //FindObservable
              MongoSource(ObservableToPublisher(coll.find(filter).first()))
            case Find(None, sro, _) => //FindObservable
              val next = toResultOption(sro)
              MongoSource(ObservableToPublisher(next(coll.find[Document]())))
            case Find(Some(filter), sro, _) => //FindObservable
              val next = toResultOption(sro)
              MongoSource(ObservableToPublisher(next(coll.find[Document](filter))))
            case _ =>
              log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
              throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
    
          }
        }
        catch { case e: Exception =>
          log.error(s"mongoStream> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
        }
    
      }
    
    
      // T => FindIterable  e.g List[Document]
      def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
        log.info(s"mgoQuery>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
    
        def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
          rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
    
    
        if ( ctx.action == None) {
          log.error(s"mgoQuery> uery action cannot be null!")
          Left(new IllegalArgumentException("query action cannot be null!"))
        }
        try {
          ctx.action.get match {
            /* count */
            case Count(Some(filter), Some(opt)) => //SingleObservable
              coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
                .toFuture().asInstanceOf[Future[T]]
            case Count(Some(filter), None) => //SingleObservable
              coll.countDocuments(filter).toFuture()
                .asInstanceOf[Future[T]]
            case Count(None, None) => //SingleObservable
              coll.countDocuments().toFuture()
                .asInstanceOf[Future[T]]
            /* distinct */
            case Distict(field, Some(filter)) => //DistinctObservable
              coll.distinct(field, filter).toFuture()
                .asInstanceOf[Future[T]]
            case Distict(field, None) => //DistinctObservable
              coll.distinct((field)).toFuture()
                .asInstanceOf[Future[T]]
            /* find */
            case Find(None, Nil, false) => //FindObservable
              if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
              else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(None, Nil, true) => //FindObservable
              if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
              else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
            case Find(Some(filter), Nil, false) => //FindObservable
              if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
              else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(Some(filter), Nil, true) => //FindObservable
              if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
              else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
            case Find(None, sro, _) => //FindObservable
              val next = toResultOption(sro)
              if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
              else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(Some(filter), sro, _) => //FindObservable
              val next = toResultOption(sro)
              if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
              else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            /* aggregate AggregateObservable*/
            case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
            /* mapReduce MapReduceObservable*/
            case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
            /* list collection */
            case ListCollection(dbName) => //ListConllectionObservable
              client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
    
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoQuery> runtime error: ${e.getMessage}")
          Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
        }
      }
      //T => Completed, result.UpdateResult, result.DeleteResult
      def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
        try {
          mgoUpdateObservable[T](ctx).toFuture()
        }
        catch { case e: Exception =>
          log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
          Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
        }
    
      def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
        log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoUpdateObservable> uery action cannot be null!")
          throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
        }
        try {
          ctx.action.get match {
            /* insert */
            case Insert(docs, Some(opt)) => //SingleObservable[Completed]
              if (docs.size > 1)
                coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
              else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
            case Insert(docs, None) => //SingleObservable
              if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
              else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
            /* delete */
            case Delete(filter, None, onlyOne) => //SingleObservable
              if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
              else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
            case Delete(filter, Some(opt), onlyOne) => //SingleObservable
              if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
              else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
            /* replace */
            case Replace(filter, replacement, None) => //SingleObservable
              coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
            case Replace(filter, replacement, Some(opt)) => //SingleObservable
              coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
            /* update */
            case Update(filter, update, None, onlyOne) => //SingleObservable
              if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
              else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
            case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
              if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
              else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
            /* bulkWrite */
            case BulkWrite(commands, None) => //SingleObservable
              coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
            case BulkWrite(commands, Some(opt)) => //SingleObservable
              coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
        }
      }
    
      def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
        log.info(s"mgoAdmin>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoAdmin> uery action cannot be null!")
          Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
        }
        try {
          ctx.action.get match {
            /* drop collection */
            case DropCollection(collName) => //SingleObservable
              val coll = db.getCollection(collName)
              coll.drop().toFuture()
            /* create collection */
            case CreateCollection(collName, None) => //SingleObservable
              db.createCollection(collName).toFuture()
            case CreateCollection(collName, Some(opt)) => //SingleObservable
              db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
            /* list collection
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
            */
            /* create view */
            case CreateView(viewName, viewOn, pline, None) => //SingleObservable
              db.createView(viewName, viewOn, pline).toFuture()
            case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
              db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
            /* create index */
            case CreateIndex(key, None) => //SingleObservable
              coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] //   asInstanceOf[SingleObservable[Completed]]
            case CreateIndex(key, Some(opt)) => //SingleObservable
              coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
            /* drop index */
            case DropIndexByName(indexName, None) => //SingleObservable
              coll.dropIndex(indexName).toFuture()
            case DropIndexByName(indexName, Some(opt)) => //SingleObservable
              coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
            case DropIndexByKey(key, None) => //SingleObservable
              coll.dropIndex(key).toFuture()
            case DropIndexByKey(key, Some(opt)) => //SingleObservable
              coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
            case DropAllIndexes(None) => //SingleObservable
              coll.dropIndexes().toFuture()
            case DropAllIndexes(Some(opt)) => //SingleObservable
              coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
        }
    
      }
    
    }
    
    
    object MongoActionStream {
    
      import MGOClasses._
    
      case class StreamingInsert[A](dbName: String,
                                    collName: String,
                                    converter: A => Document,
                                    parallelism: Int = 1
                                   ) extends MGOCommands
    
      case class StreamingDelete[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
      case class StreamingUpdate[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    toUpdate: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
    
      case class InsertAction[A](ctx: StreamingInsert[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
          Flow[A].map(ctx.converter)
            .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
      }
    
      case class UpdateAction[A](ctx: StreamingUpdate[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
      }
    
    
      case class DeleteAction[A](ctx: StreamingDelete[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
      }
    
    }
    
    object MGOHelpers {
    
      implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
        override val converter: (Document) => String = (doc) => doc.toJson
      }
    
      implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
        override val converter: (C) => String = (doc) => doc.toString
      }
    
      trait ImplicitObservable[C] {
        val observable: Observable[C]
        val converter: (C) => String
    
        def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    
        def headResult() = Await.result(observable.head(), 10 seconds)
    
        def printResults(initial: String = ""): Unit = {
          if (initial.length > 0) print(initial)
          results().foreach(res => println(converter(res)))
        }
    
        def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
      }
    
      def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
        Await.result(fut, timeOut)
      }
    
      def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
        Await.result(fut, timeOut)
      }
    
      import monix.eval.Task
      import monix.execution.Scheduler.Implicits.global
    
      final class FutureToTask[A](x: => Future[A]) {
        def asTask: Task[A] = Task.deferFuture[A](x)
      }
    
      final class TaskToFuture[A](x: => Task[A]) {
        def asFuture: Future[A] = x.runToFuture
      }
    
    }

    TestMongoEngine.scala

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    
    import scala.collection.JavaConverters._
    import com.mongodb.client.model._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    
    import scala.util._
    
    object TestMongoEngine extends App {
      import MGOEngine._
      import MGOHelpers._
      import MGOCommands._
      import MGOAdmins._
    
      // or provide custom MongoClientSettings
      val settings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
        .build()
      implicit val client: MongoClient = MongoClient(settings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
     // implicit val ec = system.dispatcher
    
      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
    
      import monix.execution.Scheduler.Implicits.global
      println(getResult(mgoAdmin(ctx).value.value.runToFuture))
    
    scala.io.StdIn.readLine()
    
      val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")
      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> mgoDate(2017,5,13),
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> mgoDate(2015,11,6),
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )
    
      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      )
      println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))
    
      scala.io.StdIn.readLine()
    
      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray],
                      handler: Option[MGOBlob]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl"),
          handler = mgoGetBlobOrNone(doc,"handler")
        )
      }
    
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
              .map { dc => toPODTL(dc)}
              .foreach { doc: PODTL =>
                print(s"==>Item: ${doc.item} ")
                print(s"price: ${doc.price} ")
                print(s"qty: ${doc.qty} ")
                doc.packing.foreach(pk => print(s"packing: ${pk} "))
                doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                println("")
              }
          case _ =>
        }
    
        po.handler match {
          case Some(blob) =>
            val fileName = s"/users/tiger/${po.ponum}.png"
            mgoBlobToFile(blob,fileName)
            println(s"picture saved to ${fileName}")
          case None => println("no picture provided")
        }
    
      }
    
      import org.mongodb.scala.model.Projections._
      import org.mongodb.scala.model.Filters._
      import org.mongodb.scala.model.Sorts._
      import org.mongodb.scala.bson.conversions._
      import org.mongodb.scala.bson.Document
    
    
      val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))
    
    
      val sort: Bson = (descending("ponum"))
      val proj: Bson = (and(include("ponum","podate")
                       ,include("vendor"),excludeId()))
      val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))
      val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))
      val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))
    
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)))
      )
    
      for {
        _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {
          case Success(eold) => eold match {
            case Right(old) => old match {
              case Some(ld) => ld.map(toPO(_)).foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eop) => eop match {
            case Right(op) => op match {
              case Some(p) => showPO(_)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {
          case Success(eops) => eops match {
            case Right(ops) => ops match {
              case Some(lp) => lp.foreach(showPO)
              case None => println(s"Empty document found!")
            }
            case Left(err) => println(s"Error: ${err.getMessage}")
          }
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
      } yield()
    
    
      scala.io.StdIn.readLine()
    
    
      system.terminate()
    }
  • 相关阅读:
    dsadsad
    线程池,封装使用,实现控制子线程
    如何能很好地安排好自己的时间?
    中文验证码
    海量数据处理专题(七)——数据库索引及优化
    java tree jtree的使用
    基于Cookie的单点登录(SSO)系统介绍
    急求VS2010的Cookie解决方法
    微软企业库5.0 学习之路系列文章索引
    Net 4.0 Parallel编程(八)Task中的数据共享(中)
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/11301362.html
Copyright © 2011-2022 走看看