zoukankan      html  css  js  c++  java
  • SDP(11):MongoDB-Engine功能实现

      根据上篇关于MongoDB-Engine的功能设计方案,我们将在这篇讨论里进行功能实现和测试。下面是具体的功能实现代码:基本上是直接调用Mongo-scala的对应函数,需要注意的是java类型和scala类型之间的相互转换:

    object MGOEngine {
      import MGOContext._
      import MGOCommands._
      import MGOAdmins._
    
      def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        ctx.action match {
            /* count */
          case Count(Some(filter),Some(opt)) =>
            coll.count(filter,opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter),None) =>
            coll.count(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None,None) =>
            coll.count().toFuture()
              .asInstanceOf[Future[T]]
            /* distinct */
          case Distict(field,Some(filter)) =>
            coll.distinct(field,filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field,None) =>
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None,None,optConv,false) =>
            if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(None,None,optConv,true) =>
            if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter),None,optConv,false) =>
            if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter),None,optConv,true) =>
            if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(None,Some(next),optConv,_) =>
            if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter),Some(next),optConv,_) =>
            if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
            /* aggregate */
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
            /* mapReduce */
          case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]]
           /* insert */
          case Insert(docs,Some(opt)) =>
            if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture()
              .asInstanceOf[Future[T]]
            else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture()
              .asInstanceOf[Future[T]]
          case Insert(docs,None) =>
            if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
            else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
            /* delete */
          case Delete(filter,None,onlyOne) =>
             if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
             else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
          case Delete(filter,Some(opt),onlyOne) =>
            if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            /* replace */
          case Replace(filter,replacement,None) =>
             coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]]
          case Replace(filter,replacement,Some(opt)) =>
            coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            /* update */
          case Update(filter,update,None,onlyOne) =>
            if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]]
          case Update(filter,update,Some(opt),onlyOne) =>
            if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            /* bulkWrite */
          case BulkWrite(commands,None) =>
             coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
          case BulkWrite(commands,Some(opt)) =>
            coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
    
            /* drop collection */
          case DropCollection(collName) =>
            val coll = db.getCollection(collName)
            coll.drop().toFuture().asInstanceOf[Future[T]]
            /* create collection */
          case CreateCollection(collName,None) =>
            db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
          case CreateCollection(collName,Some(opt)) =>
            db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
            /* list collection */
          case ListCollection(dbName) =>
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
            /* create view */
          case CreateView(viewName,viewOn,pline,None) =>
            db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]]
          case CreateView(viewName,viewOn,pline,Some(opt)) =>
            db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
            /* create index */
          case CreateIndex(key,None) =>
            coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
          case CreateIndex(key,Some(opt)) =>
            coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
            /* drop index */
          case DropIndexByName(indexName, None) =>
            coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
          case DropIndexByName(indexName, Some(opt)) =>
            coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key,None) =>
            coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key,Some(opt)) =>
            coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(None) =>
            coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(Some(opt)) =>
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
        }
    
      }
    
    }

    注意:以上所有函数都返回Future[T]结果。下面我们来试运行这些函数,不过先关注一些细节:关于MongoDB的Date,Blob,Array等类型在scala中的使用方法:

     type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)

    然后就是MongoDB数据类型的读取帮助函数:

     def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
       (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]

    下面我们就开始设置试运行环境,从一个全新的collection开始:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    import org.mongodb.scala.connection._
    
    import scala.collection.JavaConverters._
    import com.mongodb.client.model._
    
    import scala.util._
    
    object MongoEngineTest extends App {
    import MGOContext._
    import MGOEngine._
    import MGOHelpers._
    import MGOCommands._
    import MGOAdmins._
    
      val clusterSettings = ClusterSettings.builder()
        .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
      val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
      implicit val client = MongoClient(clientSettings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
      implicit val ec = system.dispatcher
    
      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
      println(getResult(mgoExecute(ctx)))

    测试运行了DropCollection指令。下面我们试着insert两个document:

      val pic = fileToMGOBlob("/users/tiger/nobody.png")
      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> mgoDate(2017,5,13),
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> mgoDate(2015,11,6),
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )
    
      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      ) println(getResult(mgoExecute(ctxInsert)))

    注意InsertManyOptions的具体设定方式。 为了配合更方便准确的强类型操作,我们需要进行Document类型到具体应用类型之间的对应转换:

      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray],
                      handler: Option[MGOBlob]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl"),
          handler = mgoGetBlobOrNone(doc,"handler")
        )
      }
    
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
             .map { dc => toPODTL(dc)}
               .foreach { doc: PODTL =>
              print(s"==>Item: ${doc.item} ")
              print(s"price: ${doc.price} ")
              print(s"qty: ${doc.qty} ")
              doc.packing.foreach(pk => print(s"packing: ${pk} "))
              doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
              println("")
            }
          case _ =>
        }
    
        po.handler match {
          case Some(blob) =>
            val fileName = s"/users/tiger/${po.ponum}.png"
            mgoBlobToFile(blob,fileName)
            println(s"picture saved to ${fileName}")
          case None => println("no picture provided")
        }
    
      }

    在上面的代码里我们使用了前面提供的MongoDB数据类型读取帮助函数。下面我们测试对poCollection中的Document进行查询,示范包括projection,sort,filter等:

      import org.mongodb.scala.model.Projections._
      import org.mongodb.scala.model.Filters._
      import org.mongodb.scala.model.Sorts._
      val sort: MGOFilterResult = find => find.sort(descending("ponum"))
      val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId()))
      val ctxFind = ctx.setCommand(Find(andThen=Some(proj)))
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _)))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _))
      )
      
      for {
        _ <- mgoExecute[List[Document]](ctxFind).andThen {
          case Success(docs) => docs.map(toPO).foreach(showPO)
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoExecute[PO](ctxFindFirst).andThen {
          case Success(doc) => showPO(doc)
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen {
          case Success(docs) => docs.foreach(showPO)
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
      } yield()

    因为mgoExecute返回Future结果,所以我们可以用for-comprehension对几个运算进行串联运行。

    下面是这次示范的源代码:

    build.sbt

    name := "learn-mongo"
    
    version := "0.1"
    
    scalaVersion := "2.12.4"
    
    libraryDependencies := Seq(
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",
      "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17"
    )

    MGOHelpers.scala

    import org.mongodb.scala._
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object MGOHelpers {
    
      implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
        override val converter: (Document) => String = (doc) => doc.toJson
      }
    
      implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
        override val converter: (C) => String = (doc) => doc.toString
      }
    
      trait ImplicitObservable[C] {
        val observable: Observable[C]
        val converter: (C) => String
    
        def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
        def headResult() = Await.result(observable.head(), 10 seconds)
        def printResults(initial: String = ""): Unit = {
          if (initial.length > 0) print(initial)
          results().foreach(res => println(converter(res)))
        }
        def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
      }
    
      def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
          Await.result(fut,timeOut)
      }
      def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
        Await.result(fut,timeOut)
      }
    
    }

    FileStreaming.scala

    import java.io.{InputStream, ByteArrayInputStream}
    import java.nio.ByteBuffer
    import java.nio.file.Paths
    
    import akka.stream.{Materializer}
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    
    import scala.concurrent.{Await}
    import akka.util._
    import scala.concurrent.duration._
    
    object FileStreaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
    }

    MongoEngine.scala

    import java.text.SimpleDateFormat
    
    import org.bson.conversions.Bson
    import org.mongodb.scala._
    import org.mongodb.scala.model._
    import java.util.Calendar
    import scala.collection.JavaConverters._
    import FileStreaming._
    import akka.stream.Materializer
    import org.mongodb.scala.bson.{BsonArray, BsonBinary}
    
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object MGOContext {
    
      trait MGOCommands
    
      object MGOCommands {
    
        case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
    
      /*  org.mongodb.scala.FindObservable
      import com.mongodb.async.client.FindIterable
      val resultDocType = FindIterable[Document]
      val resultOption = FindObservable(resultDocType)
        .maxScan(...)
      .limit(...)
      .sort(...)
      .project(...) */
        case class Find[M](filter: Option[Bson] = None,
                        andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                        converter: Option[Document => M] = None,
                        firstOnly: Boolean = false) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }
    
      case class MGOContext(
                             dbName: String,
                             collName: String,
                             action: MGOCommands = null
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd)
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
    
        def apply(db: String, coll: String, command: MGOCommands) =
          new MGOContext(db, coll, command)
    
      }
    
      type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)
    
      def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
       (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
    }
    object MGOEngine {
      import MGOContext._
      import MGOCommands._
      import MGOAdmins._
    
      def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        ctx.action match {
            /* count */
          case Count(Some(filter),Some(opt)) =>
            coll.count(filter,opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter),None) =>
            coll.count(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None,None) =>
            coll.count().toFuture()
              .asInstanceOf[Future[T]]
            /* distinct */
          case Distict(field,Some(filter)) =>
            coll.distinct(field,filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field,None) =>
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None,None,optConv,false) =>
            if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(None,None,optConv,true) =>
            if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter),None,optConv,false) =>
            if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter),None,optConv,true) =>
            if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(None,Some(next),optConv,_) =>
            if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter),Some(next),optConv,_) =>
            if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
            /* aggregate */
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
            /* mapReduce */
          case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]]
           /* insert */
          case Insert(docs,Some(opt)) =>
            if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture()
              .asInstanceOf[Future[T]]
            else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture()
              .asInstanceOf[Future[T]]
          case Insert(docs,None) =>
            if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
            else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
            /* delete */
          case Delete(filter,None,onlyOne) =>
             if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
             else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
          case Delete(filter,Some(opt),onlyOne) =>
            if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            /* replace */
          case Replace(filter,replacement,None) =>
             coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]]
          case Replace(filter,replacement,Some(opt)) =>
            coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            /* update */
          case Update(filter,update,None,onlyOne) =>
            if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]]
          case Update(filter,update,Some(opt),onlyOne) =>
            if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            /* bulkWrite */
          case BulkWrite(commands,None) =>
             coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
          case BulkWrite(commands,Some(opt)) =>
            coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
    
            /* drop collection */
          case DropCollection(collName) =>
            val coll = db.getCollection(collName)
            coll.drop().toFuture().asInstanceOf[Future[T]]
            /* create collection */
          case CreateCollection(collName,None) =>
            db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
          case CreateCollection(collName,Some(opt)) =>
            db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
            /* list collection */
          case ListCollection(dbName) =>
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
            /* create view */
          case CreateView(viewName,viewOn,pline,None) =>
            db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]]
          case CreateView(viewName,viewOn,pline,Some(opt)) =>
            db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
            /* create index */
          case CreateIndex(key,None) =>
            coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
          case CreateIndex(key,Some(opt)) =>
            coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
            /* drop index */
          case DropIndexByName(indexName, None) =>
            coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
          case DropIndexByName(indexName, Some(opt)) =>
            coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key,None) =>
            coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key,Some(opt)) =>
            coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(None) =>
            coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(Some(opt)) =>
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
        }
    
      }
    
    }

    MongoEngineTest.scala

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    import org.mongodb.scala.connection._
    
    import scala.collection.JavaConverters._
    import com.mongodb.client.model._
    
    import scala.util._
    
    object MongoEngineTest extends App {
    import MGOContext._
    import MGOEngine._
    import MGOHelpers._
    import MGOCommands._
    import MGOAdmins._
    
      val clusterSettings = ClusterSettings.builder()
        .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
      val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
      implicit val client = MongoClient(clientSettings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
      implicit val ec = system.dispatcher
    
      val ctx = MGOContext("testdb","po").setCommand(
        DropCollection("po"))
      println(getResult(mgoExecute(ctx)))
    
    
      val pic = fileToMGOBlob("/users/tiger/nobody.png")
      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> mgoDate(2017,5,13),
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> mgoDate(2015,11,6),
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )
    
      val optInsert = new InsertManyOptions().ordered(true)
      val ctxInsert = ctx.setCommand(
        Insert(Seq(po1,po2),Some(optInsert))
      )
     // println(getResult(mgoExecute(ctxInsert)))
    
      case class PO (
                      ponum: String,
                      podate: MGODate,
                      vendor: String,
                      remarks: Option[String],
                      podtl: Option[MGOArray],
                      handler: Option[MGOBlob]
                    )
      def toPO(doc: Document): PO = {
        PO(
          ponum = doc.getString("ponum"),
          podate = doc.getDate("podate"),
          vendor = doc.getString("vendor"),
          remarks = mgoGetStringOrNone(doc,"remarks"),
          podtl = mgoGetArrayOrNone(doc,"podtl"),
          handler = mgoGetBlobOrNone(doc,"handler")
        )
      }
    
      case class PODTL(
                        item: String,
                        price: Double,
                        qty: Int,
                        packing: Option[String],
                        payTerm: Option[String]
                      )
      def toPODTL(podtl: Document): PODTL = {
        PODTL(
          item = podtl.getString("item"),
          price = podtl.getDouble("price"),
          qty = podtl.getInteger("qty"),
          packing = mgoGetStringOrNone(podtl,"packing"),
          payTerm = mgoGetStringOrNone(podtl,"payterm")
        )
      }
    
      def showPO(po: PO) = {
        println(s"po number: ${po.ponum}")
        println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
        println(s"vendor: ${po.vendor}")
        if (po.remarks != None)
          println(s"remarks: ${po.remarks.get}")
        po.podtl match {
          case Some(barr) =>
            mgoArrayToDocumentList(barr)
             .map { dc => toPODTL(dc)}
               .foreach { doc: PODTL =>
              print(s"==>Item: ${doc.item} ")
              print(s"price: ${doc.price} ")
              print(s"qty: ${doc.qty} ")
              doc.packing.foreach(pk => print(s"packing: ${pk} "))
              doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
              println("")
            }
          case _ =>
        }
    
        po.handler match {
          case Some(blob) =>
            val fileName = s"/users/tiger/${po.ponum}.png"
            mgoBlobToFile(blob,fileName)
            println(s"picture saved to ${fileName}")
          case None => println("no picture provided")
        }
    
      }
    
      import org.mongodb.scala.model.Projections._
      import org.mongodb.scala.model.Filters._
      import org.mongodb.scala.model.Sorts._
      val sort: MGOFilterResult = find => find.sort(descending("ponum"))
      val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId()))
      val ctxFind = ctx.setCommand(Find(andThen=Some(proj)))
      val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _)))
      val ctxFindArrayItem = ctx.setCommand(
        Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _))
      )
    
      for {
        _ <- mgoExecute[List[Document]](ctxFind).andThen {
          case Success(docs) => docs.map(toPO).foreach(showPO)
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoExecute[PO](ctxFindFirst).andThen {
          case Success(doc) => showPO(doc)
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
    
        _ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen {
          case Success(docs) => docs.foreach(showPO)
            println("-------------------------------")
          case Failure(e) => println(e.getMessage)
        }
      } yield()
    
    
      scala.io.StdIn.readLine()
    
    
      system.terminate()
    }

     

  • 相关阅读:
    函数
    关联子查询
    子查询
    视图(VIEW)
    顺时针打印矩阵
    二叉树的镜像
    树的子结构
    将两个有序链表合并
    反转链表
    输出链表中倒数第k个结点
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8548860.html
Copyright © 2011-2022 走看看