zoukankan      html  css  js  c++  java
  • PICE(4):MongoDBStreaming

       前两篇我们介绍了JDBC和Cassandra的gRPC streaming实现。相对MongoDB来说,JDBC和Cassandra支持字符类型的query语句SQL,CQL,所以把query指令转换成protobuf structures是简单直接的。而MongoDB没有提供字符类的query,所以我们必须进行MongoDB query涉及的所有类型与protobuf类型的相互转换,实现gRPC功能会复杂的多。我们在这篇讨论里先介绍MongoDB query的protobuf转换。

    在前面的MongoDB-Engine讨论里我们设计了个MGOContext作为JVM内部传递MongoDB query的数据结构:

     case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_QUERY,
                             action: Option[MGOCommands] = None
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
      }

    下面是这个结构支持的action清单:

     object MGOCommands {
    
        case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
    
        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                           firstOnly: Boolean = false) extends MGOCommands
    
        case class DocumentStream(filter: Option[Bson] = None,
                                  andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                                 ) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }

    可以看到,我们必须把Bson、Document、FindObservable这几个类型对应到protobuf格式。下面是.proto文件里的部分内容:

    message MGODocument {
      bytes document = 1;
    }
    
     message MGOBson {
      bytes bson = 1;
    }
    
    message ResultTransformer { //FindObservable
       int32 optType = 1;
       MGOBson bsonParam = 2;
       int32 valueParam = 3;
    }
    
    message MGOAdminOptons {
      string tarName = 1;
      repeated MGOBson bsonParam  = 2;
      OptionAny options = 3;
      string objName = 4;
    }
    
    message MGOOperations {  //MGOContext
      string dbName = 1;
      string collName = 2;
      int32 commandType = 3;
      repeated MGOBson bsonParam = 4;
      repeated ResultTransformer resultOptions = 5;
      OptionAny options = 6;
      repeated MGODocument documents = 7;
      google.protobuf.BoolValue only = 8;
      MGOAdminOptons adminOptions = 9;
    }

    首先,Document是个serializable类,可以直接进行序列/反序列化:

      val po = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "remarks" -> "urgent, rush order",
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
      println(po)
      val pobytes = marshal(po)
      println(s"po bytes: ${pobytes}")
      val po1 = unmarshal[Document](pobytes)
      println(s"back to po document: $po1")

    下一个是Bson,它是个java interface:

    /**
     * An interface for types that are able to render themselves into a {@code BsonDocument}.
     *
     * @since 3.0
     */
    public interface Bson {
        /**
         * Render the filter into a BsonDocument.
         *
         * @param documentClass the document class in scope for the collection.  This parameter may be ignored, but it may be used to alter
         *                      the structure of the returned {@code BsonDocument} based on some knowledge of the document class.
         * @param codecRegistry the codec registry.  This parameter may be ignored, but it may be used to look up {@code Codec} instances for
         *                      the document class or any other related class.
         * @param <TDocument> the type of the document class
         * @return the BsonDocument
         */
        <TDocument> BsonDocument toBsonDocument(Class<TDocument> documentClass, CodecRegistry codecRegistry);
    }

    Bson只是一个interface,不是serilizable,不过BsonDocument可以:

    /**
     * A type-safe container for a BSON document.  This class should NOT be sub-classed by third parties.
     *
     * @since 3.0
     */
    public class BsonDocument extends BsonValue implements Map<String, BsonValue>, Cloneable, Bson, Serializable {...}

    所以我们可以用BsonDocument来进行序列/反序列后在再用它来构建一个新的Bson对象:

      def bsonToProto(bson: Bson) =
        MGOBson(marshal(bson.toBsonDocument(
          classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
    
      def protoToBson(proto: MGOBson): Bson = new Bson {
        val bsdoc = unmarshal[BsonDocument](proto.bson)
        override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
      }

    最后是这个FindObservable:这个类型的应用场景是这样的:

        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                           firstOnly: Boolean = false) extends MGOCommands

    FindObservable类型的效果可以是一连串施用的结果,因为是FindObservable[A] => FindObservable[A]这样的款式,所以我们可以用一串FindObservable[Document]来进行序列/反序列化处理,然后再重新串连施用来获得最终的FindObservable。FindObservable对应的protobuf结构如下:

    message ResultTransformer { //FindObservable
       int32 optType = 1;
       MGOBson bsonParam = 2;
       int32 valueParam = 3;
    }
    
      type FOD_TYPE       = Int
      val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item
      val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]
      val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]
      val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]
      val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]
                               //Sets a document describing the fields to return for all matching documents
      val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]
      val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]
                               //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
      val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]
                               //Sets the cursor type
      val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]
                               //Sets the hint for which index to use. A null value means no hint is set
      val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]
                               //Sets the exclusive upper bound for a specific index. A null value means no max is set
      val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]
                               //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
      val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
                               //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
      val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
                               //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents
    
      case class ResultOptions(
                              optType: FOD_TYPE,
                              bsonParam: Option[Bson] = None,
                              valueParam: Int = 0 ){
        def toProto = new sdp.grpc.services.ResultTransformer(
          optType = this.optType,
          bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
          valueParam = this.valueParam
        )
        def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bsonParam.get)
            case  FOD_LIMIT        => find.limit(valueParam)
            case  FOD_SKIP         => find.skip(valueParam)
            case  FOD_PROJECTION   => find.projection(bsonParam.get)
            case  FOD_SORT         => find.sort(bsonParam.get)
            case  FOD_PARTIAL      => find.partial(valueParam != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bsonParam.get)
            case  FOD_MAX          => find.max(bsonParam.get)
            case  FOD_MIN          => find.min(bsonParam.get)
            case  FOD_RETURNKEY    => find.returnKey(valueParam != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
    
          }
        }
      }
      object ResultOptions {
        def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
          optType = msg.optType,
          bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
          valueParam = msg.valueParam
        )
    
      }

    我们可以用这个ResultOptions类型的toProto,fromProto来进行protobuf的转换处理。然后用aggregation实现连串施用:

         def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
            rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))

    下面这个函数示范了Find Context的反序列:

      def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
        case MGO_COMMAND_FIND => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Find())
          )
          def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
            rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
    
          (proto.bsonParam, proto.resultOptions, proto.only) match {
            case (Nil, Nil, None) => ctx
            case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
            case (bp,Nil,None) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,false))
            case (bp,Nil,Some(b)) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,b))
            case (bp,fo,None) => {
                 ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
            }
            case (bp,fo,Some(b)) => {
              ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
            }
            case _ => ctx
          }
        }
      }

    具体的应用示范例如下:

     val eqState = equal("state","California")
      val proj = exclude("rowid","_id")
      val rtxfmr = Seq(
        ResultOptions(
        optType = FOD_LIMIT,
        valueParam = 3)
        ,ResultOptions(
        optType = FOD_PROJECTION,
        bsonParam = Some(proj))
      )
    
      val protoCtx = MGOProtoMsg(
        dbName = "testdb",
        collName = "aqmrpt",
        commandType = MGO_COMMAND_FIND,
        bsonParam = Seq(eqState),
        resultOptions = rtxfmr
      ).toProto
    
      val findCtx = CtxFromProto(protoCtx)
      val futFind = mgoQuery[Seq[Document]](findCtx)
      futFind.onComplete {
        case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
        case Failure(e) => println(e.getMessage)
      } 

    下面是本次讨论的部分源代码:

    MongoDBEngine.scala

    package sdp.mongo.engine
    import java.text.SimpleDateFormat
    
    import akka.NotUsed
    import akka.stream.alpakka.mongodb.scaladsl._
    import akka.stream.scaladsl.{Flow, Sink, Source}
    import org.mongodb.scala.MongoClient
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.bson.conversions.Bson
    import org.mongodb.scala._
    import org.mongodb.scala.model._
    import java.util.Calendar
    
    import scala.collection.JavaConverters._
    import sdp.file.Streaming._
    import akka.stream.Materializer
    import org.mongodb.scala.bson.{BsonArray, BsonBinary}
    
    import scala.concurrent._
    import scala.concurrent.duration._
    
    import sdp.logging.LogSupport
    
    object MGOContext {
      type MGO_ACTION_TYPE = Int
      val MGO_QUERY        = 0
      val MGO_UPDATE       = 1
      val MGO_ADMIN        = 2
    
    
      trait MGOCommands
    
      object MGOCommands {
    
        case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
    
        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                           firstOnly: Boolean = false) extends MGOCommands
    
        case class DocumentStream(filter: Option[Bson] = None,
                                  andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                                 ) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }
    
      case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_QUERY,
                             action: Option[MGOCommands] = None
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
      }
    
      case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
        ctxs =>
        def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
        def appendContext(ctx: MGOContext): MGOBatContext =
          ctxs.copy(contexts = contexts :+ ctx)
      }
    
      object MGOBatContext {
        def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
      }
    
      type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)
    
      def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
        (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
    }
    object MGOEngine extends LogSupport {
    
      import MGOContext._
      import MGOCommands._
      import MGOAdmins._
    
    
      object TxUpdateMode {
        private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
                  implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
          log.info(s"mgoTxUpdate> calling ...")
          observable.map(clientSession => {
    
            val transactionOptions =
              TransactionOptions.builder()
                .readConcern(ReadConcern.SNAPSHOT)
                .writeConcern(WriteConcern.MAJORITY).build()
    
            clientSession.startTransaction(transactionOptions)
    
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
    
            Await.ready(fut, 3 seconds)
    
            clientSession
          })
        }
    
        private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"commitAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
              log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
              commitAndRetry(observable)
            }
            case e: Exception => {
              log.error(s"commitAndRetry> Exception during commit ...: $e")
              throw e
            }
          })
        }
    
        private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"runTransactionAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
              log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
              runTransactionAndRetry(observable)
            }
          })
        }
    
        def mgoTxBatch(ctxs: MGOBatContext)(
                implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
    
          log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")
    
          val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
          val commitTransactionObservable: SingleObservable[Completed] =
            updateObservable.flatMap(clientSession => clientSession.commitTransaction())
          val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
    
          runTransactionAndRetry(commitAndRetryObservable)
        }.toFuture()
      }
    
    
      def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
        log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")
        if (ctxs.tx) {
            TxUpdateMode.mgoTxBatch(ctxs)
          } else {
    
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
            Await.ready(fut, 3 seconds)
            Future.successful(new Completed)
    
          }
    
      }
    
      // T => FindIterable  e.g List[Document]
      def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): Future[T] = {
        log.info(s"mgoQuery>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoQuery> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          /* count */
          case Count(Some(filter), Some(opt)) => //SingleObservable
            coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter), None) => //SingleObservable
            coll.countDocuments(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None, None) => //SingleObservable
            coll.countDocuments().toFuture()
              .asInstanceOf[Future[T]]
          /* distinct */
          case Distict(field, Some(filter)) => //DistinctObservable
            coll.distinct(field, filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field, None) => //DistinctObservable
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None, None, false) => //FindObservable
            if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
          case Find(None, None, true) => //FindObservable
            if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter), None, false) => //FindObservable
            if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), None, true) => //FindObservable
            if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
          case Find(None, Some(next), _) => //FindObservable
            if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), Some(next), _) => //FindObservable
            if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
          /* aggregate AggregateObservable*/
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
          /* mapReduce MapReduceObservable*/
          case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
          /* list collection */
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
    
        }
      }
    
      //T => Completed, result.UpdateResult, result.DeleteResult
      def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] =
        mgoUpdateObservable[T](ctx).toFuture()
    
      def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
        log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoUpdateObservable> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          /* insert */
          case Insert(docs, Some(opt)) =>                  //SingleObservable[Completed]
            if (docs.size > 1)
              coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
            else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
          case Insert(docs, None) =>                       //SingleObservable
            if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
            else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
          /* delete */
          case Delete(filter, None, onlyOne) =>            //SingleObservable
            if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
            else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
          case Delete(filter, Some(opt), onlyOne) =>       //SingleObservable
            if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
            else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
          /* replace */
          case Replace(filter, replacement, None) =>        //SingleObservable
            coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
          case Replace(filter, replacement, Some(opt)) =>    //SingleObservable
            coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
          /* update */
          case Update(filter, update, None, onlyOne) =>      //SingleObservable
            if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
            else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
          case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
            if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
            else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
          /* bulkWrite */
          case BulkWrite(commands, None) =>                  //SingleObservable
            coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
          case BulkWrite(commands, Some(opt)) =>             //SingleObservable
            coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
        }
      }
    
      def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): SingleObservable[Completed] = {
        log.info(s"mgoAdmin>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoAdmin> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          /* drop collection */
          case DropCollection(collName) =>                   //SingleObservable
            val coll = db.getCollection(collName)
            coll.drop()
          /* create collection */
          case CreateCollection(collName, None) =>           //SingleObservable
            db.createCollection(collName)
          case CreateCollection(collName, Some(opt)) =>      //SingleObservable
            db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions])
          /* list collection
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
            */
          /* create view */
          case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable
            db.createView(viewName, viewOn, pline)
          case CreateView(viewName, viewOn, pline, Some(opt)) =>  //SingleObservable
            db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions])
          /* create index */
          case CreateIndex(key, None) =>                     //SingleObservable
            coll.createIndex(key).asInstanceOf[SingleObservable[Completed]]
          case CreateIndex(key, Some(opt)) =>                //SingleObservable
            coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[SingleObservable[Completed]]
          /* drop index */
          case DropIndexByName(indexName, None) =>           //SingleObservable
            coll.dropIndex(indexName)
          case DropIndexByName(indexName, Some(opt)) =>      //SingleObservable
            coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions])
          case DropIndexByKey(key, None) =>                  //SingleObservable
            coll.dropIndex(key)
          case DropIndexByKey(key, Some(opt)) =>             //SingleObservable
            coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions])
          case DropAllIndexes(None) =>                       //SingleObservable
            coll.dropIndexes()
          case DropAllIndexes(Some(opt)) =>                  //SingleObservable
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions])
        }
      }
    
    /*
        def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        ctx.action match {
          /* count */
          case Count(Some(filter), Some(opt)) =>   //SingleObservable
            coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter), None) =>        //SingleObservable
            coll.countDocuments(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None, None) =>                //SingleObservable
            coll.countDocuments().toFuture()
              .asInstanceOf[Future[T]]
          /* distinct */
          case Distict(field, Some(filter)) =>     //DistinctObservable
            coll.distinct(field, filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field, None) =>             //DistinctObservable
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None, None, optConv, false) =>  //FindObservable
            if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(None, None, optConv, true) =>   //FindObservable
            if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, false) =>   //FindObservable
            if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, true) =>   //FindObservable
            if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(None, Some(next), optConv, _) =>   //FindObservable
            if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), Some(next), optConv, _) =>  //FindObservable
            if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          /* aggregate AggregateObservable*/
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
          /* mapReduce MapReduceObservable*/
          case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
          /* insert */
          case Insert(docs, Some(opt)) =>                  //SingleObservable[Completed]
            if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
              .asInstanceOf[Future[T]]
            else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
              .asInstanceOf[Future[T]]
          case Insert(docs, None) =>                       //SingleObservable
            if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
            else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
          /* delete */
          case Delete(filter, None, onlyOne) =>            //SingleObservable
            if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
          case Delete(filter, Some(opt), onlyOne) =>       //SingleObservable
            if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
          /* replace */
          case Replace(filter, replacement, None) =>        //SingleObservable
            coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
          case Replace(filter, replacement, Some(opt)) =>    //SingleObservable
            coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* update */
          case Update(filter, update, None, onlyOne) =>      //SingleObservable
            if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
          case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
            if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* bulkWrite */
          case BulkWrite(commands, None) =>                  //SingleObservable
            coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
          case BulkWrite(commands, Some(opt)) =>             //SingleObservable
            coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
    
          /* drop collection */
          case DropCollection(collName) =>                   //SingleObservable
            val coll = db.getCollection(collName)
            coll.drop().toFuture().asInstanceOf[Future[T]]
          /* create collection */
          case CreateCollection(collName, None) =>           //SingleObservable
            db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
          case CreateCollection(collName, Some(opt)) =>      //SingleObservable
            db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
          /* list collection */
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
          /* create view */
          case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable
            db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
          case CreateView(viewName, viewOn, pline, Some(opt)) =>  //SingleObservable
            db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
          /* create index */
          case CreateIndex(key, None) =>                     //SingleObservable
            coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
          case CreateIndex(key, Some(opt)) =>                //SingleObservable
            coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
          /* drop index */
          case DropIndexByName(indexName, None) =>           //SingleObservable
            coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
          case DropIndexByName(indexName, Some(opt)) =>      //SingleObservable
            coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, None) =>                  //SingleObservable
            coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, Some(opt)) =>             //SingleObservable
            coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(None) =>                       //SingleObservable
            coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(Some(opt)) =>                  //SingleObservable
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
        }
      }
    */
    
    
      def mongoStream(ctx: MGOContext)(
        implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
        log.info(s"mongoStream>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mongoStream> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        ctx.action.get match {
          case DocumentStream(None, None) =>
            MongoSource(coll.find())
          case DocumentStream(Some(filter), None) =>
            MongoSource(coll.find(filter))
          case DocumentStream(None, Some(next)) =>
            MongoSource(next(coll.find()))
          case DocumentStream(Some(filter), Some(next)) =>
            MongoSource(next(coll.find(filter)))
        }
      }
    
    }
    
    
    object MongoActionStream {
    
      import MGOContext._
    
      case class StreamingInsert[A](dbName: String,
                                    collName: String,
                                    converter: A => Document,
                                    parallelism: Int = 1
                                   ) extends MGOCommands
    
      case class StreamingDelete[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
      case class StreamingUpdate[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    toUpdate: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
    
      case class InsertAction[A](ctx: StreamingInsert[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
          Flow[A].map(ctx.converter)
            .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
      }
    
      case class UpdateAction[A](ctx: StreamingUpdate[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
      }
    
    
      case class DeleteAction[A](ctx: StreamingDelete[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
      }
    
    }
    
    object MGOHelpers {
    
      implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
        override val converter: (Document) => String = (doc) => doc.toJson
      }
    
      implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
        override val converter: (C) => String = (doc) => doc.toString
      }
    
      trait ImplicitObservable[C] {
        val observable: Observable[C]
        val converter: (C) => String
    
        def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    
        def headResult() = Await.result(observable.head(), 10 seconds)
    
        def printResults(initial: String = ""): Unit = {
          if (initial.length > 0) print(initial)
          results().foreach(res => println(converter(res)))
        }
    
        def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
      }
    
      def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
        Await.result(fut, timeOut)
      }
    
      def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
        Await.result(fut, timeOut)
      }
    
      import monix.eval.Task
      import monix.execution.Scheduler.Implicits.global
    
      final class FutureToTask[A](x: => Future[A]) {
        def asTask: Task[A] = Task.deferFuture[A](x)
      }
    
      final class TaskToFuture[A](x: => Task[A]) {
        def asFuture: Future[A] = x.runAsync
      }
    
    }

    MgoProtoConversion.scala

    package sdp.mongo.engine
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.bson.conversions.Bson
    import sdp.grpc.services._
    import protobuf.bytes.Converter._
    import com.google.protobuf.ByteString
    import MGOContext._
    import MGOAdmins._
    import MGOCommands._
    import org.bson.BsonDocument
    import org.bson.codecs.configuration.CodecRegistry
    import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
    import org.mongodb.scala.FindObservable
    
    object MgoProtoConvertion {
      /*  org.mongodb.scala.FindObservable
      import com.mongodb.async.client.FindIterable
      val resultDocType = FindIterable[Document]
      val resultOption = FindObservable(resultDocType)
        .maxScan(...)
      .limit(...)
      .sort(...)
      .project(...) */
    
      type FOD_TYPE       = Int
      val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item
      val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]
      val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]
      val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]
      val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]
                               //Sets a document describing the fields to return for all matching documents
      val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]
      val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]
                               //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
      val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]
                               //Sets the cursor type
      val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]
                               //Sets the hint for which index to use. A null value means no hint is set
      val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]
                               //Sets the exclusive upper bound for a specific index. A null value means no max is set
      val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]
                               //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
      val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
                               //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
      val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
                               //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents
    
      case class ResultOptions(
                              optType: FOD_TYPE,
                              bsonParam: Option[Bson] = None,
                              valueParam: Int = 0 ){
        def toProto = new sdp.grpc.services.ResultTransformer(
          optType = this.optType,
          bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
          valueParam = this.valueParam
        )
        def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bsonParam.get)
            case  FOD_LIMIT        => find.limit(valueParam)
            case  FOD_SKIP         => find.skip(valueParam)
            case  FOD_PROJECTION   => find.projection(bsonParam.get)
            case  FOD_SORT         => find.sort(bsonParam.get)
            case  FOD_PARTIAL      => find.partial(valueParam != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bsonParam.get)
            case  FOD_MAX          => find.max(bsonParam.get)
            case  FOD_MIN          => find.min(bsonParam.get)
            case  FOD_RETURNKEY    => find.returnKey(valueParam != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
    
          }
        }
      }
      object ResultOptions {
        def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
          optType = msg.optType,
          bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
          valueParam = msg.valueParam
        )
    
      }
    
    
      type MGO_COMMAND_TYPE = Int
      val MGO_COMMAND_FIND            = 0
      val MGO_COMMAND_COUNT           = 20
      val MGO_COMMAND_DISTICT         = 21
      val MGO_COMMAND_DOCUMENTSTREAM  = 1
      val MGO_COMMAND_AGGREGATE       = 2
      val MGO_COMMAND_INSERT          = 3
      val MGO_COMMAND_DELETE          = 4
      val MGO_COMMAND_REPLACE         = 5
      val MGO_COMMAND_UPDATE          = 6
    
    
      val MGO_ADMIN_DROPCOLLECTION    = 8
      val MGO_ADMIN_CREATECOLLECTION  = 9
      val MGO_ADMIN_LISTCOLLECTION    = 10
      val MGO_ADMIN_CREATEVIEW        = 11
      val MGO_ADMIN_CREATEINDEX       = 12
      val MGO_ADMIN_DROPINDEXBYNAME   = 13
      val MGO_ADMIN_DROPINDEXBYKEY    = 14
      val MGO_ADMIN_DROPALLINDEXES    = 15
    
    
      case class MGOAdminCtx(
                               tarName: String = "",
                               bsonParam: Seq[Bson] = Nil,
                               options: Option[Any] = None,
                               objName: String = ""
         ){
        def toProto = sdp.grpc.services.MGOAdminOptons(
          tarName = this.tarName,
          bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
          objName = this.objName,
          options = this.options.map(b => OptionAny(marshal(b)))
    
        )
      }
    
      object MGOAdminCtx {
        def fromProto(msg: sdp.grpc.services.MGOAdminOptons) = new MGOAdminCtx(
          tarName = msg.tarName,
          bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson))
        )
      }
    
      case class MGOProtoMsg(
              dbName: String = "",
              collName: String = "",
              commandType: MGO_COMMAND_TYPE,
              bsonParam: Seq[Bson] = Nil,
              resultOptions: Seq[ResultOptions] = Nil,
              options: ByteString = com.google.protobuf.ByteString.EMPTY,
              documents: Seq[Document] = Nil,
              only: Boolean = false,
              adminOptions: Option[MGOAdminCtx] = None
          ){
    
        def toProto = new sdp.grpc.services.MGOOperations(
          dbName = this.dbName,
          collName = this.collName,
          commandType = this.commandType,
          bsonParam = this.bsonParam.map(bsonToProto),
          resultOptions = this.resultOptions.map(_.toProto),
          documents = this.documents.map(d => sdp.grpc.services.MGODocument(marshal(d))),
          only = Some(this.only),
          adminOptions = this.adminOptions.map(_.toProto)
        )
    
      }
    
      object MGOProtoMsg {
        def fromProto(msg: sdp.grpc.services.MGOOperations) = new MGOProtoMsg(
          dbName = msg.dbName,
          collName = msg.collName,
          commandType = msg.commandType,
          bsonParam = msg.bsonParam.map(protoToBson),
          resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r))
        )
      }
    
      def bsonToProto(bson: Bson) =
        MGOBson(marshal(bson.toBsonDocument(
          classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
    
      def protoToBson(proto: MGOBson): Bson = new Bson {
        val bsdoc = unmarshal[BsonDocument](proto.bson)
        override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
      }
    
      def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
        case MGO_COMMAND_FIND => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Find())
          )
          def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
            rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
    
          (proto.bsonParam, proto.resultOptions, proto.only) match {
            case (Nil, Nil, None) => ctx
            case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
            case (bp,Nil,None) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,false))
            case (bp,Nil,Some(b)) => ctx.setCommand(
              Find(Some(protoToBson(bp.head)),None,b))
            case (bp,fo,None) => {
                 ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
            }
            case (bp,fo,Some(b)) => {
              ctx.setCommand(
                Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
            }
            case _ => ctx
          }
        }
      }
    
    }

    BytesConverter.scala

    package protobuf.bytes
    import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
    import com.google.protobuf.ByteString
    object Converter {
    
      def marshal(value: Any): ByteString = {
        val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
        val oos = new ObjectOutputStream(stream)
        oos.writeObject(value)
        oos.close()
        ByteString.copyFrom(stream.toByteArray())
      }
    
      def unmarshal[A](bytes: ByteString): A = {
        val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
        val value = ois.readObject()
        ois.close()
        value.asInstanceOf[A]
      }
    
    }

    FindDemo.scala

    package demo.sdp.mgo.localapp
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import org.mongodb.scala._
    
    import scala.util._
    import scala.collection.JavaConverters._
    import sdp.mongo.engine._
    import org.mongodb.scala.model._
    import akka.stream.scaladsl.{Sink, Source}
    import org.bson.codecs.configuration.CodecRegistry
    import org.mongodb.scala.bson.{BsonDocument, BsonValue}
    import scalikejdbc._
    import sdp.jdbc.engine._
    import sdp.jdbc.config._
    object ProtoTests extends App {
      import MGOContext._
      import MGOEngine._
      import MGOCommands._
      import MongoActionStream._
      import MgoProtoConvertion._
      import org.mongodb.scala.model._
      import Projections._
      import Filters._
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
      implicit val ec = system.dispatcher
    
      val clientSettings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings {b =>
          b.hosts(List(new ServerAddress("localhost")).asJava)
        }.build()
    
      implicit val client: MongoClient = MongoClient(clientSettings)
    
    
      val eqState = equal("state","California")
      val proj = exclude("rowid","_id")
      val rtxfmr = Seq(
        ResultOptions(
        optType = FOD_LIMIT,
        valueParam = 3)
        ,ResultOptions(
        optType = FOD_PROJECTION,
        bsonParam = Some(proj))
      )
    
      val protoCtx = MGOProtoMsg(
        dbName = "testdb",
        collName = "aqmrpt",
        commandType = MGO_COMMAND_FIND,
        bsonParam = Seq(eqState),
        resultOptions = rtxfmr
      ).toProto
    
      val findCtx = CtxFromProto(protoCtx)
      val futFind = mgoQuery[Seq[Document]](findCtx)
      futFind.onComplete {
        case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
        case Failure(e) => println(e.getMessage)
      }
    
      scala.io.StdIn.readLine()
      system.terminate()
    
    }

     

  • 相关阅读:
    高频交易程序竟然是饿罗斯人开发的?
    系统功能在用户测试阶段被推翻
    去新华书店有感
    金桔
    结香
    金钟花
    金丝桃
    箬竹
    香茶菜
    水果兰
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/9345558.html
Copyright © 2011-2022 走看看