zoukankan      html  css  js  c++  java
  • PICE(4):MongoDBStreaming

       前两篇我们介绍了JDBC和Cassandra的gRPC streaming实现。相对MongoDB来说,JDBC和Cassandra支持字符类型的query语句SQL,CQL,所以把query指令转换成protobuf structures是简单直接的。而MongoDB没有提供字符类的query,所以我们必须进行MongoDB query涉及的所有类型与protobuf类型的相互转换,实现gRPC功能会复杂的多。我们在这篇讨论里先介绍MongoDB query的protobuf转换。

    在前面的MongoDB-Engine讨论里我们设计了个MGOContext作为JVM内部传递MongoDB query的数据结构:

     case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_QUERY,
                             action: Option[MGOCommands] = None
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
      }

    下面是这个结构支持的action清单:

     object MGOCommands {
    
        case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
    
        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                           firstOnly: Boolean = false) extends MGOCommands
    
        case class DocumentStream(filter: Option[Bson] = None,
                                  andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                                 ) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }

    可以看到,我们必须把Bson、Document、FindObservable这几个类型对应到protobuf格式。下面是.proto文件里的部分内容:

    message MGODocument {
      bytes document = 1;
    }
    
     message MGOBson {
      bytes bson = 1;
    }
    
    message ResultTransformer { //FindObservable
       int32 optType = 1;
       MGOBson bsonParam = 2;
       int32 valueParam = 3;
    }
    
    message MGOAdminOptons {
      string tarName = 1;
      repeated MGOBson bsonParam  = 2;
      OptionAny options = 3;
      string objName = 4;
    }
    
    message MGOOperations {  //MGOContext
      string dbName = 1;
      string collName = 2;
      int32 commandType = 3;
      repeated MGOBson bsonParam = 4;
      repeated ResultTransformer resultOptions = 5;
      OptionAny options = 6;
      repeated MGODocument documents = 7;
      google.protobuf.BoolValue only = 8;
      MGOAdminOptons adminOptions = 9;
    }

    首先,Document是个serializable类,可以直接进行序列/反序列化:

      val po = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "remarks" -> "urgent, rush order",
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
      println(po)
      val pobytes = marshal(po)
      println(s"po bytes: ${pobytes}")
      val po1 = unmarshal[Document](pobytes)
      println(s"back to po document: $po1")

    下一个是Bson,它是个java interface:

    /**
     * An interface for types that are able to render themselves into a {@code BsonDocument}.
     *
     * @since 3.0
     */
    public interface Bson {
        /**
         * Render the filter into a BsonDocument.
         *
         * @param documentClass the document class in scope for the collection.  This parameter may be ignored, but it may be used to alter
         *                      the structure of the returned {@code BsonDocument} based on some knowledge of the document class.
         * @param codecRegistry the codec registry.  This parameter may be ignored, but it may be used to look up {@code Codec} instances for
         *                      the document class or any other related class.
         * @param <TDocument> the type of the document class
         * @return the BsonDocument
         */
        <TDocument> BsonDocument toBsonDocument(Class<TDocument> documentClass, CodecRegistry codecRegistry);
    }

    Bson只是一个interface,不是serilizable,不过BsonDocument可以:

    /**
     * A type-safe container for a BSON document.  This class should NOT be sub-classed by third parties.
     *
     * @since 3.0
     */
    public class BsonDocument extends BsonValue implements Map<String, BsonValue>, Cloneable, Bson, Serializable {...}

    所以我们可以用BsonDocument来进行序列/反序列后在再用它来构建一个新的Bson对象:

      def bsonToProto(bson: Bson) =
        MGOBson(marshal(bson.toBsonDocument(
          classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
    
      def protoToBson(proto: MGOBson): Bson = new Bson {
        val bsdoc = unmarshal[BsonDocument](proto.bson)
        override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
      }

    最后是这个FindObservable:这个类型的应用场景是这样的:

        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                           firstOnly: Boolean = false) extends MGOCommands

    FindObservable类型的效果可以是一连串施用的结果,因为是FindObservable[A] => FindObservable[A]这样的款式,所以我们可以用一串FindObservable[Document]来进行序列/反序列化处理,然后再重新串连施用来获得最终的FindObservable。FindObservable对应的protobuf结构如下:

    message ResultTransformer { //FindObservable
       int32 optType = 1;
       MGOBson bsonParam = 2;
       int32 valueParam = 3;
    }
    
      type FOD_TYPE       = Int
      val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item
      val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]
      val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]
      val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]
      val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]
                               //Sets a document describing the fields to return for all matching documents
      val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]
      val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]
                               //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
      val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]
                               //Sets the cursor type
      val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]
                               //Sets the hint for which index to use. A null value means no hint is set
      val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]
                               //Sets the exclusive upper bound for a specific index. A null value means no max is set
      val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]
                               //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
      val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
                               //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
      val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
                               //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents
    
      case class ResultOptions(
                              optType: FOD_TYPE,
                              bsonParam: Option[Bson] = None,
                              valueParam: Int = 0 ){
        def toProto = new sdp.grpc.services.ResultTransformer(
          optType = this.optType,
          bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
          valueParam = this.valueParam
        )
        def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bsonParam.get)
            case  FOD_LIMIT        => find.limit(valueParam)
            case  FOD_SKIP         => find.skip(valueParam)
            case  FOD_PROJECTION   => find.projection(bsonParam.get)
            case  FOD_SORT         => find.sort(bsonParam.get)
            case  FOD_PARTIAL      => find.partial(valueParam != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bsonParam.get)
            case  FOD_MAX          => find.max(bsonParam.get)
            case  FOD_MIN          => find.min(bsonParam.get)
            case  FOD_RETURNKEY    => find.returnKey(valueParam != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
    
          }
        }
      }
      object ResultOptions {
        def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
          optType = msg.optType,
          bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
          valueParam = msg.valueParam
        )
    
      }

    我们可以用这个ResultOptions类型的toProto,fromProto来进行protobuf的转换处理。然后用aggregation实现连串施用:

         def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
            rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))

    下面这个函数示范了Find Context的反序列:

      def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
        case MGO_COMMAND_FIND => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Find())
          )
          def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
            rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
    
          (proto.bsonParam, proto.resultOptions, proto.only) match {
            case (Nil, Nil, None) => ctx
            case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
            case (bp,Nil,None) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,false))
            case (bp,Nil,Some(b)) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,b))
            case (bp,fo,None) => {
                 ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
            }
            case (bp,fo,Some(b)) => {
              ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
            }
            case _ => ctx
          }
        }
      }

    具体的应用示范例如下:

     val eqState = equal("state","California")
      val proj = exclude("rowid","_id")
      val rtxfmr = Seq(
        ResultOptions(
        optType = FOD_LIMIT,
        valueParam = 3)
        ,ResultOptions(
        optType = FOD_PROJECTION,
        bsonParam = Some(proj))
      )
    
      val protoCtx = MGOProtoMsg(
        dbName = "testdb",
        collName = "aqmrpt",
        commandType = MGO_COMMAND_FIND,
        bsonParam = Seq(eqState),
        resultOptions = rtxfmr
      ).toProto
    
      val findCtx = CtxFromProto(protoCtx)
      val futFind = mgoQuery[Seq[Document]](findCtx)
      futFind.onComplete {
        case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
        case Failure(e) => println(e.getMessage)
      } 

    下面是本次讨论的部分源代码:

    MongoDBEngine.scala

    package sdp.mongo.engine
    import java.text.SimpleDateFormat
    
    import akka.NotUsed
    import akka.stream.alpakka.mongodb.scaladsl._
    import akka.stream.scaladsl.{Flow, Sink, Source}
    import org.mongodb.scala.MongoClient
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.bson.conversions.Bson
    import org.mongodb.scala._
    import org.mongodb.scala.model._
    import java.util.Calendar
    
    import scala.collection.JavaConverters._
    import sdp.file.Streaming._
    import akka.stream.Materializer
    import org.mongodb.scala.bson.{BsonArray, BsonBinary}
    
    import scala.concurrent._
    import scala.concurrent.duration._
    
    import sdp.logging.LogSupport
    
    object MGOContext {
      type MGO_ACTION_TYPE = Int
      val MGO_QUERY        = 0
      val MGO_UPDATE       = 1
      val MGO_ADMIN        = 2
    
    
      trait MGOCommands
    
      object MGOCommands {
    
        case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
    
        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                           firstOnly: Boolean = false) extends MGOCommands
    
        case class DocumentStream(filter: Option[Bson] = None,
                                  andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                                 ) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }
    
      case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_QUERY,
                             action: Option[MGOCommands] = None
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
      }
    
      case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
        ctxs =>
        def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
        def appendContext(ctx: MGOContext): MGOBatContext =
          ctxs.copy(contexts = contexts :+ ctx)
      }
    
      object MGOBatContext {
        def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
      }
    
      type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)
    
      def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
        (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
    }
    object MGOEngine extends LogSupport {
    
      import MGOContext._
      import MGOCommands._
      import MGOAdmins._
    
    
      object TxUpdateMode {
        private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
                  implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
          log.info(s"mgoTxUpdate> calling ...")
          observable.map(clientSession => {
    
            val transactionOptions =
              TransactionOptions.builder()
                .readConcern(ReadConcern.SNAPSHOT)
                .writeConcern(WriteConcern.MAJORITY).build()
    
            clientSession.startTransaction(transactionOptions)
    
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
    
            Await.ready(fut, 3 seconds)
    
            clientSession
          })
        }
    
        private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"commitAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
              log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
              commitAndRetry(observable)
            }
            case e: Exception => {
              log.error(s"commitAndRetry> Exception during commit ...: $e")
              throw e
            }
          })
        }
    
        private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"runTransactionAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
              log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
              runTransactionAndRetry(observable)
            }
          })
        }
    
        def mgoTxBatch(ctxs: MGOBatContext)(
                implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
    
          log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")
    
          val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
          val commitTransactionObservable: SingleObservable[Completed] =
            updateObservable.flatMap(clientSession => clientSession.commitTransaction())
          val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
    
          runTransactionAndRetry(commitAndRetryObservable)
        }.toFuture()
      }
    
    
      def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
        log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")
        if (ctxs.tx) {
            TxUpdateMode.mgoTxBatch(ctxs)
          } else {
    
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
            Await.ready(fut, 3 seconds)
            Future.successful(new Completed)
    
          }
    
      }
    
      // T => FindIterable  e.g List[Document]
      def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): Future[T] = {
        log.info(s"mgoQuery>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoQuery> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          /* count */
          case Count(Some(filter), Some(opt)) => //SingleObservable
            coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter), None) => //SingleObservable
            coll.countDocuments(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None, None) => //SingleObservable
            coll.countDocuments().toFuture()
              .asInstanceOf[Future[T]]
          /* distinct */
          case Distict(field, Some(filter)) => //DistinctObservable
            coll.distinct(field, filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field, None) => //DistinctObservable
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None, None, false) => //FindObservable
            if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
          case Find(None, None, true) => //FindObservable
            if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter), None, false) => //FindObservable
            if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), None, true) => //FindObservable
            if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
          case Find(None, Some(next), _) => //FindObservable
            if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), Some(next), _) => //FindObservable
            if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
          /* aggregate AggregateObservable*/
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
          /* mapReduce MapReduceObservable*/
          case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
          /* list collection */
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
    
        }
      }
    
      //T => Completed, result.UpdateResult, result.DeleteResult
      def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] =
        mgoUpdateObservable[T](ctx).toFuture()
    
      def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
        log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoUpdateObservable> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          /* insert */
          case Insert(docs, Some(opt)) =>                  //SingleObservable[Completed]
            if (docs.size > 1)
              coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
            else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
          case Insert(docs, None) =>                       //SingleObservable
            if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
            else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
          /* delete */
          case Delete(filter, None, onlyOne) =>            //SingleObservable
            if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
            else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
          case Delete(filter, Some(opt), onlyOne) =>       //SingleObservable
            if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
            else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
          /* replace */
          case Replace(filter, replacement, None) =>        //SingleObservable
            coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
          case Replace(filter, replacement, Some(opt)) =>    //SingleObservable
            coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
          /* update */
          case Update(filter, update, None, onlyOne) =>      //SingleObservable
            if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
            else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
          case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
            if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
            else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
          /* bulkWrite */
          case BulkWrite(commands, None) =>                  //SingleObservable
            coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
          case BulkWrite(commands, Some(opt)) =>             //SingleObservable
            coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
        }
      }
    
      def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): SingleObservable[Completed] = {
        log.info(s"mgoAdmin>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoAdmin> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          /* drop collection */
          case DropCollection(collName) =>                   //SingleObservable
            val coll = db.getCollection(collName)
            coll.drop()
          /* create collection */
          case CreateCollection(collName, None) =>           //SingleObservable
            db.createCollection(collName)
          case CreateCollection(collName, Some(opt)) =>      //SingleObservable
            db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions])
          /* list collection
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
            */
          /* create view */
          case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable
            db.createView(viewName, viewOn, pline)
          case CreateView(viewName, viewOn, pline, Some(opt)) =>  //SingleObservable
            db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions])
          /* create index */
          case CreateIndex(key, None) =>                     //SingleObservable
            coll.createIndex(key).asInstanceOf[SingleObservable[Completed]]
          case CreateIndex(key, Some(opt)) =>                //SingleObservable
            coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[SingleObservable[Completed]]
          /* drop index */
          case DropIndexByName(indexName, None) =>           //SingleObservable
            coll.dropIndex(indexName)
          case DropIndexByName(indexName, Some(opt)) =>      //SingleObservable
            coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions])
          case DropIndexByKey(key, None) =>                  //SingleObservable
            coll.dropIndex(key)
          case DropIndexByKey(key, Some(opt)) =>             //SingleObservable
            coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions])
          case DropAllIndexes(None) =>                       //SingleObservable
            coll.dropIndexes()
          case DropAllIndexes(Some(opt)) =>                  //SingleObservable
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions])
        }
      }
    
    /*
        def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        ctx.action match {
          /* count */
          case Count(Some(filter), Some(opt)) =>   //SingleObservable
            coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter), None) =>        //SingleObservable
            coll.countDocuments(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None, None) =>                //SingleObservable
            coll.countDocuments().toFuture()
              .asInstanceOf[Future[T]]
          /* distinct */
          case Distict(field, Some(filter)) =>     //DistinctObservable
            coll.distinct(field, filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field, None) =>             //DistinctObservable
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None, None, optConv, false) =>  //FindObservable
            if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(None, None, optConv, true) =>   //FindObservable
            if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, false) =>   //FindObservable
            if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, true) =>   //FindObservable
            if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(None, Some(next), optConv, _) =>   //FindObservable
            if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), Some(next), optConv, _) =>  //FindObservable
            if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          /* aggregate AggregateObservable*/
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
          /* mapReduce MapReduceObservable*/
          case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
          /* insert */
          case Insert(docs, Some(opt)) =>                  //SingleObservable[Completed]
            if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
              .asInstanceOf[Future[T]]
            else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
              .asInstanceOf[Future[T]]
          case Insert(docs, None) =>                       //SingleObservable
            if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
            else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
          /* delete */
          case Delete(filter, None, onlyOne) =>            //SingleObservable
            if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
          case Delete(filter, Some(opt), onlyOne) =>       //SingleObservable
            if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
          /* replace */
          case Replace(filter, replacement, None) =>        //SingleObservable
            coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
          case Replace(filter, replacement, Some(opt)) =>    //SingleObservable
            coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* update */
          case Update(filter, update, None, onlyOne) =>      //SingleObservable
            if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
          case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
            if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* bulkWrite */
          case BulkWrite(commands, None) =>                  //SingleObservable
            coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
          case BulkWrite(commands, Some(opt)) =>             //SingleObservable
            coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
    
          /* drop collection */
          case DropCollection(collName) =>                   //SingleObservable
            val coll = db.getCollection(collName)
            coll.drop().toFuture().asInstanceOf[Future[T]]
          /* create collection */
          case CreateCollection(collName, None) =>           //SingleObservable
            db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
          case CreateCollection(collName, Some(opt)) =>      //SingleObservable
            db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
          /* list collection */
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
          /* create view */
          case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable
            db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
          case CreateView(viewName, viewOn, pline, Some(opt)) =>  //SingleObservable
            db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
          /* create index */
          case CreateIndex(key, None) =>                     //SingleObservable
            coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
          case CreateIndex(key, Some(opt)) =>                //SingleObservable
            coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
          /* drop index */
          case DropIndexByName(indexName, None) =>           //SingleObservable
            coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
          case DropIndexByName(indexName, Some(opt)) =>      //SingleObservable
            coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, None) =>                  //SingleObservable
            coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, Some(opt)) =>             //SingleObservable
            coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(None) =>                       //SingleObservable
            coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(Some(opt)) =>                  //SingleObservable
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
        }
      }
    */
    
    
      def mongoStream(ctx: MGOContext)(
        implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
        log.info(s"mongoStream>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mongoStream> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          case DocumentStream(None, None) =>
            MongoSource(coll.find())
          case DocumentStream(Some(filter), None) =>
            MongoSource(coll.find(filter))
          case DocumentStream(None, Some(next)) =>
            MongoSource(next(coll.find()))
          case DocumentStream(Some(filter), Some(next)) =>
            MongoSource(next(coll.find(filter)))
        }
      }
    
    }
    
    
    object MongoActionStream {
    
      import MGOContext._
    
      case class StreamingInsert[A](dbName: String,
                                    collName: String,
                                    converter: A => Document,
                                    parallelism: Int = 1
                                   ) extends MGOCommands
    
      case class StreamingDelete[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
      case class StreamingUpdate[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    toUpdate: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
    
      case class InsertAction[A](ctx: StreamingInsert[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
          Flow[A].map(ctx.converter)
            .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
      }
    
      case class UpdateAction[A](ctx: StreamingUpdate[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
      }
    
    
      case class DeleteAction[A](ctx: StreamingDelete[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
      }
    
    }
    
    object MGOHelpers {
    
      implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
        override val converter: (Document) => String = (doc) => doc.toJson
      }
    
      implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
        override val converter: (C) => String = (doc) => doc.toString
      }
    
      trait ImplicitObservable[C] {
        val observable: Observable[C]
        val converter: (C) => String
    
        def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    
        def headResult() = Await.result(observable.head(), 10 seconds)
    
        def printResults(initial: String = ""): Unit = {
          if (initial.length > 0) print(initial)
          results().foreach(res => println(converter(res)))
        }
    
        def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
      }
    
      def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
        Await.result(fut, timeOut)
      }
    
      def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
        Await.result(fut, timeOut)
      }
    
      import monix.eval.Task
      import monix.execution.Scheduler.Implicits.global
    
      final class FutureToTask[A](x: => Future[A]) {
        def asTask: Task[A] = Task.deferFuture[A](x)
      }
    
      final class TaskToFuture[A](x: => Task[A]) {
        def asFuture: Future[A] = x.runAsync
      }
    
    }

    MgoProtoConversion.scala

    package sdp.mongo.engine
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.bson.conversions.Bson
    import sdp.grpc.services._
    import protobuf.bytes.Converter._
    import com.google.protobuf.ByteString
    import MGOContext._
    import MGOAdmins._
    import MGOCommands._
    import org.bson.BsonDocument
    import org.bson.codecs.configuration.CodecRegistry
    import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
    import org.mongodb.scala.FindObservable
    
    object MgoProtoConvertion {
      /*  org.mongodb.scala.FindObservable
      import com.mongodb.async.client.FindIterable
      val resultDocType = FindIterable[Document]
      val resultOption = FindObservable(resultDocType)
        .maxScan(...)
      .limit(...)
      .sort(...)
      .project(...) */
    
      type FOD_TYPE       = Int
      val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item
      val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]
      val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]
      val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]
      val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]
                               //Sets a document describing the fields to return for all matching documents
      val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]
      val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]
                               //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
      val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]
                               //Sets the cursor type
      val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]
                               //Sets the hint for which index to use. A null value means no hint is set
      val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]
                               //Sets the exclusive upper bound for a specific index. A null value means no max is set
      val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]
                               //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
      val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
                               //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
      val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
                               //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents
    
      case class ResultOptions(
                              optType: FOD_TYPE,
                              bsonParam: Option[Bson] = None,
                              valueParam: Int = 0 ){
        def toProto = new sdp.grpc.services.ResultTransformer(
          optType = this.optType,
          bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
          valueParam = this.valueParam
        )
        def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bsonParam.get)
            case  FOD_LIMIT        => find.limit(valueParam)
            case  FOD_SKIP         => find.skip(valueParam)
            case  FOD_PROJECTION   => find.projection(bsonParam.get)
            case  FOD_SORT         => find.sort(bsonParam.get)
            case  FOD_PARTIAL      => find.partial(valueParam != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bsonParam.get)
            case  FOD_MAX          => find.max(bsonParam.get)
            case  FOD_MIN          => find.min(bsonParam.get)
            case  FOD_RETURNKEY    => find.returnKey(valueParam != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
    
          }
        }
      }
      object ResultOptions {
        def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
          optType = msg.optType,
          bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
          valueParam = msg.valueParam
        )
    
      }
    
    
      type MGO_COMMAND_TYPE = Int
      val MGO_COMMAND_FIND            = 0
      val MGO_COMMAND_COUNT           = 20
      val MGO_COMMAND_DISTICT         = 21
      val MGO_COMMAND_DOCUMENTSTREAM  = 1
      val MGO_COMMAND_AGGREGATE       = 2
      val MGO_COMMAND_INSERT          = 3
      val MGO_COMMAND_DELETE          = 4
      val MGO_COMMAND_REPLACE         = 5
      val MGO_COMMAND_UPDATE          = 6
    
    
      val MGO_ADMIN_DROPCOLLECTION    = 8
      val MGO_ADMIN_CREATECOLLECTION  = 9
      val MGO_ADMIN_LISTCOLLECTION    = 10
      val MGO_ADMIN_CREATEVIEW        = 11
      val MGO_ADMIN_CREATEINDEX       = 12
      val MGO_ADMIN_DROPINDEXBYNAME   = 13
      val MGO_ADMIN_DROPINDEXBYKEY    = 14
      val MGO_ADMIN_DROPALLINDEXES    = 15
    
    
      case class MGOAdminCtx(
                               tarName: String = "",
                               bsonParam: Seq[Bson] = Nil,
                               options: Option[Any] = None,
                               objName: String = ""
         ){
        def toProto = sdp.grpc.services.MGOAdminOptons(
          tarName = this.tarName,
          bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
          objName = this.objName,
          options = this.options.map(b => OptionAny(marshal(b)))
    
        )
      }
    
      object MGOAdminCtx {
        def fromProto(msg: sdp.grpc.services.MGOAdminOptons) = new MGOAdminCtx(
          tarName = msg.tarName,
          bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson))
        )
      }
    
      case class MGOProtoMsg(
              dbName: String = "",
              collName: String = "",
              commandType: MGO_COMMAND_TYPE,
              bsonParam: Seq[Bson] = Nil,
              resultOptions: Seq[ResultOptions] = Nil,
              options: ByteString = com.google.protobuf.ByteString.EMPTY,
              documents: Seq[Document] = Nil,
              only: Boolean = false,
              adminOptions: Option[MGOAdminCtx] = None
          ){
    
        def toProto = new sdp.grpc.services.MGOOperations(
          dbName = this.dbName,
          collName = this.collName,
          commandType = this.commandType,
          bsonParam = this.bsonParam.map(bsonToProto),
          resultOptions = this.resultOptions.map(_.toProto),
          documents = this.documents.map(d => sdp.grpc.services.MGODocument(marshal(d))),
          only = Some(this.only),
          adminOptions = this.adminOptions.map(_.toProto)
        )
    
      }
    
      object MGOProtoMsg {
        def fromProto(msg: sdp.grpc.services.MGOOperations) = new MGOProtoMsg(
          dbName = msg.dbName,
          collName = msg.collName,
          commandType = msg.commandType,
          bsonParam = msg.bsonParam.map(protoToBson),
          resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r))
        )
      }
    
      def bsonToProto(bson: Bson) =
        MGOBson(marshal(bson.toBsonDocument(
          classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
    
      def protoToBson(proto: MGOBson): Bson = new Bson {
        val bsdoc = unmarshal[BsonDocument](proto.bson)
        override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
      }
    
      def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
        case MGO_COMMAND_FIND => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Find())
          )
          def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
            rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
    
          (proto.bsonParam, proto.resultOptions, proto.only) match {
            case (Nil, Nil, None) => ctx
            case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
            case (bp,Nil,None) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,false))
            case (bp,Nil,Some(b)) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,b))
            case (bp,fo,None) => {
                 ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
            }
            case (bp,fo,Some(b)) => {
              ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
            }
            case _ => ctx
          }
        }
      }
    
    }

    BytesConverter.scala

    package protobuf.bytes
    import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
    import com.google.protobuf.ByteString
    object Converter {
    
      def marshal(value: Any): ByteString = {
        val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
        val oos = new ObjectOutputStream(stream)
        oos.writeObject(value)
        oos.close()
        ByteString.copyFrom(stream.toByteArray())
      }
    
      def unmarshal[A](bytes: ByteString): A = {
        val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
        val value = ois.readObject()
        ois.close()
        value.asInstanceOf[A]
      }
    
    }

    FindDemo.scala

    package demo.sdp.mgo.localapp
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    
    import scala.util._
    import scala.collection.JavaConverters._
    import sdp.mongo.engine._
    import org.mongodb.scala.model._
    import akka.stream.scaladsl.{Sink, Source}
    import org.bson.codecs.configuration.CodecRegistry
    import org.mongodb.scala.bson.{BsonDocument, BsonValue}
    import scalikejdbc._
    import sdp.jdbc.engine._
    import sdp.jdbc.config._
    object ProtoTests extends App {
      import MGOContext._
      import MGOEngine._
      import MGOCommands._
      import MongoActionStream._
      import MgoProtoConvertion._
      import org.mongodb.scala.model._
      import Projections._
      import Filters._
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
      implicit val ec = system.dispatcher
    
      val clientSettings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings {b =>
          b.hosts(List(new ServerAddress("localhost")).asJava)
        }.build()
    
      implicit val client: MongoClient = MongoClient(clientSettings)
    
    
      val eqState = equal("state","California")
      val proj = exclude("rowid","_id")
      val rtxfmr = Seq(
        ResultOptions(
        optType = FOD_LIMIT,
        valueParam = 3)
        ,ResultOptions(
        optType = FOD_PROJECTION,
        bsonParam = Some(proj))
      )
    
      val protoCtx = MGOProtoMsg(
        dbName = "testdb",
        collName = "aqmrpt",
        commandType = MGO_COMMAND_FIND,
        bsonParam = Seq(eqState),
        resultOptions = rtxfmr
      ).toProto
    
      val findCtx = CtxFromProto(protoCtx)
      val futFind = mgoQuery[Seq[Document]](findCtx)
      futFind.onComplete {
        case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
        case Failure(e) => println(e.getMessage)
      }
    
      scala.io.StdIn.readLine()
      system.terminate()
    
    }

     

  • 相关阅读:
    POJ 1141 括号匹配 DP
    881. Boats to Save People
    870. Advantage Shuffle
    874. Walking Robot Simulation
    文件操作
    861. Score After Flipping Matrix
    860. Lemonade Change
    842. Split Array into Fibonacci Sequence
    765. Couples Holding Hands
    763. Partition Labels
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/9345558.html
Copyright © 2011-2022 走看看