zoukankan      html  css  js  c++  java
  • Akka-Cluster(3)- ClusterClient, 集群客户端

      上篇我们介绍了distributed pub/sub消息传递机制。这是在同一个集群内的消息共享机制:发布者(publisher)和订阅者(subscriber)都在同一个集群的节点上,所有节点上的DistributedPubSubMediator通过集群内部的沟通机制在底层构建了消息流通渠道。在actor pub/sub层面可以实现对象位置透明化。在现实里很多前端都会作为某个集群的客户端但又与集群分离,又或者两个独立的集群之间可能会发生交互关系,这是也会出现客户端与服务端不在同一集群内的情况,ClusterClient就是为集群外部actor与集群内部actor进行沟通的解决方案。

    实际上ClusterClient模式就代表一种依赖于消息发布订阅机制的服务方式:客户端通过消息来请求服务,服务端接收请求服务消息并提供相应运算服务。

    我们可以把集群客户端模式分成集群客户端ClusterClient和集群服务端ClusterClientReceptionist,从字面理解这就是个接待员这么个角色,负责接待集群外客户端发起的服务请求。在集群所有节点上(或者选定角色role)都部署ClusterClientReceptionist,它们都与本节点的DistributedPubSubMediator对接组成更上一层的消息订阅方,ClusterClient与ClusterClientReceptionist的对接又组成了一种统一集群环境可以实现上集所讨论的distributed pub/sub机制。

    ClusterClient就是消息发布方,它是在目标集群之外机器上的某个actor。这个机器上的actor如果需要向集群内部actor发送消息可以通过这个机器上的ClusterClient actor与集群内的ClusterClientReceptionist搭建的通道向集群内某个ClusterClientReceptionist连接的DistributedPubSubMediator所登记的actor进行消息发送。所以使用集群客户端的机器必须在本机启动ClusterClient服务(运行这个actor),这是通讯桥梁的一端。

    ClusterClient在启动时用预先配置的地址(contact points)与ClusterClientReceptionist连接,然后通过ClusterClientReceptionist发布的联络点清单来维护内部的对接点清单,可以进行持久化,在发生系统重启时用这个名单来与集群连接。一旦连接,ClusterClient会监控对方运行情况,自动进行具体ClusterClientReceiptionist的替换。ClusterClient发布消息是包嵌在三种结构里的:

    1、ClusterClient.Send

    2、ClusterClient.SendAll

    3、ClusterClient.Publish

    这几种方法我们在上篇已经讨论过,这里就略去。

    ClusterClientReceiptionist是集群内的消息接收接口。集群内需要接收消息的actor必须在本地的DistributedPubSubMediator上注册自己的地址,ClusterClientReceptionist由此获得集群内所有服务项目actor的地址清单。通过ClusterClient发布的消息内指定接收方类型信息来确定最终接收消息并提供服务的actor。服务注册示范如下:

    //注册服务A
      val serviceA = system.actorOf(Props[Service], "serviceA")
      ClusterClientReceptionist(system).registerService(serviceA)
    //注册服务B
      val serviceB = system.actorOf(Props[Service], "serviceB")
      ClusterClientReceptionist(system).registerService(serviceB)

    ClusterClient调用服务示范:

      val client = system.actorOf(ClusterClient.props(
      ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
      client ! ClusterClient.Send("/user/serviceA", DoThis, localAffinity = true)
      client ! ClusterClient.SendToAll("/user/serviceB", DoThat)

    注意:ClusterClientReceptionist需要接收DoThis,DoThat消息并实现相关的运算。

    在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。从发布消息的actor角度看,sender()代表的是DeadLetter。如果服务actor需要知道请求者具体地址,发布方可以把自己的地址嵌在发布的消息结构里。

    下面我们就通过一个简单的例子来进行示范。先设计两个服务actor:Cat,Dog 。假设它们会提供不同的叫声作为服务吧:

    class Cat extends Actor with ActorLogging {
      //使用pub/sub方式设置
      val mediator = DistributedPubSub(context.system).mediator
      override def preStart() = {
        mediator ! Subscribe("Shout", self)
        super.preStart()
      }
    
      override def receive: Receive = {
        case "Shout" =>
          log.info("*******I am a cat, MIAOM ...******")
      }
    }
    
    class Dog extends Actor with ActorLogging {
      //使用pub/sub方式设置
      val mediator = DistributedPubSub(context.system).mediator
      override def preStart() = {
        mediator ! Subscribe("Shout", self)
        super.preStart()
      }
      override def receive: Receive = {
        case "Shout" =>
          log.info("*****I am a dog, WANG WANG...*****")
      }
    }

    我们看到,这就是两个很普通的actor。但我们还是可以和上一篇分布式pub/sub结合起来验证cluster-client是基于distributed-pub/sub的。然后我们分别把这两个actor(服务)放到不同的集群节点上:

    object Cat {
      def props = Props[Cat]
      def create(port: Int): ActorSystem  = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
          .withFallback(ConfigFactory.load())
    
        val system = ActorSystem("ClusterSystem",config)
        val catSound = system.actorOf(props,"CatSound")
    
        ClusterClientReceptionist(system).registerService(catSound)
        system
      }
    }
    
    object Dog {
      def props = Props(new Dog)
      def create(port: Int): ActorSystem = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
          .withFallback(ConfigFactory.load())
        val system = ActorSystem("ClusterSystem",config)
        val dogSound = system.actorOf(props,"DogSound")
        ClusterClientReceptionist(system).registerService(dogSound)
        system
      }
    }

    注意:集群名称是ClusterSystem。我们分别在actor所在节点用ClusterClientReceptionist.registerService登记了服务。这个集群所使用的conf如下:

    akka.actor.warn-about-java-serializer-usage = off
    akka.log-dead-letters-during-shutdown = off
    akka.log-dead-letters = off
    
    akka {
      loglevel = INFO
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
      actor {
        provider = "cluster"
        serializers {
          java = "akka.serialization.JavaSerializer"
          proto = "akka.remote.serialization.ProtobufSerializer"
        }
        serialization-bindings {
          "java.lang.String" = java
          "scalapb.GeneratedMessage" = proto
        }
      }
    
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551"]
        log-info = off
      }
    }

    这是一个比较完整的集群配置文档,只有port需要再配置。然后运行这两个节点:

    object PetHouse extends App {
    
      val sysCat = Cat.create(2551)
      val sysDog = Dog.create(2552)
    
      scala.io.StdIn.readLine()
    
      sysCat.terminate()
      sysDog.terminate()
    
    }

    完成了在2551,2552节点上的Cat,Dog actor构建及ClusterClientReceptionist.registerService服务登记。现在看看客户端:

    object PetClient extends App {
    
      val conf = ConfigFactory.load("client")
      val clientSystem = ActorSystem("ClientSystem",conf)
    /* 从 conf 文件里读取 contact-points 地址
      val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
        case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist"
      }.toSet
    */
    
      //先放一个contact-point, 系统会自动增加其它的点
      val initialContacts = Set(
        ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")
      )
    
      val clusterClient = clientSystem.actorOf(
        ClusterClient.props(
          ClusterClientSettings(clientSystem)
            .withInitialContacts(initialContacts)),
        "petClient")
    
      clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)
      clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)
    
      println(s"sent shout messages ...")
      scala.io.StdIn.readLine()
    
      clusterClient ! Publish("Shout","Shout")
      println(s"publish shout messages ...")
    
      scala.io.StdIn.readLine()
      clientSystem.terminate();
    }

    客户端的ActorSystem名称为ClientSystem,是在ClusterSystem集群之外的。conf文件如下:

    akka {
    
      actor.provider = remote
    
      remote.netty.tcp.port= 2553
      remote.netty.tcp.hostname=127.0.0.1
    
    }
    
    contact-points = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    把它设成actor.provider=remote可以免去提供seednodes。运算结果:

     [12/08/2018 09:32:51.432] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
    [INFO] [12/08/2018 09:32:51.435] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****
    [INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
    [INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****

    无论ClusterClient或Receptionist都会针对自己的状态发送消息。我们可以截取这些消息来做些相应的工作。参考下面的截听器示范代码: 

    package petsound
    import akka.actor._
    import akka.cluster.client._
    class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
      override def preStart(): Unit = {
        clusterClient ! SubscribeContactPoints
        super.preStart()
      }
    
      override def receive: Receive = {
        case ContactPoints(cps) =>
          cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}
        case ContactPointAdded(cp) =>
          log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")
        case ContactPointRemoved(cp) =>
          log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")
    
      }
    }
    
    class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
      override def preStart(): Unit = {
        receptionist ! SubscribeClusterClients
        super.preStart()
      }
    
      override def receive: Receive = {
        case ClusterClients(cs) =>
          cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}
        case ClusterClientUp(cc) =>
          log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")
        case ClusterClientUnreachable(cc) =>
          log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")
    
      }
    }

    这两个event-listener的安装方法如下:

        val receptionist = ClusterClientReceptionist(system).underlying
        system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")
    
        val receptionist = ClusterClientReceptionist(system).underlying
        system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")
    
      val clusterClient = clientSystem.actorOf(
        ClusterClient.props(
          ClusterClientSettings(clientSystem)
            .withInitialContacts(initialContacts)),
        "petClient")
    
      clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")

    看看运算结果:

    [INFO] [12/09/2018 09:42:40.838] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPoints:akka.tcp://ClusterSystem@127.0.0.1:2551******
    [INFO] [12/09/2018 09:42:40.947] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPointAdded: akka.tcp://ClusterSystem@127.0.0.1:2552*******
    [INFO] [12/09/2018 09:42:40.967] [ClientSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClientSystem@127.0.0.1:2553/user/petClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist]
    
    
    [INFO] [12/09/2018 09:42:40.979] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUp: akka.tcp://ClientSystem@127.0.0.1:2553*******
    
    [INFO] [12/09/2018 09:54:34.363] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUnreachable: akka.tcp://ClientSystem@127.0.0.1:2553*******

    下面我们再做个示范,还是与上篇讨论一样:由集群客户端发送MongoDB指令至某个在集群里用ClusterClientReceptionist注册的MongoDB操作服务actor。服务方接收指令后在MongoDB上进行运算。下面是MongoDB的服务actor: 

    package petsound
    import akka.actor._
    import com.typesafe.config._
    import akka.actor.ActorSystem
    import org.mongodb.scala._
    import sdp.grpc.services.ProtoMGOContext
    import sdp.mongo.engine.MGOClasses._
    import sdp.mongo.engine.MGOEngine._
    import sdp.result.DBOResult._
    import akka.cluster.client._
    
    import scala.collection.JavaConverters._
    import scala.util._
    
    class MongoAdder extends Actor with ActorLogging {
      import monix.execution.Scheduler.Implicits.global
      implicit val mgosys = context.system
      implicit val ec = mgosys.dispatcher
    
      val clientSettings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings {b =>
          b.hosts(List(new ServerAddress("localhost:27017")).asJava)
        }.build()
    
      implicit val client: MongoClient = MongoClient(clientSettings)
    
      val ctx = MGOContext("testdb","friends")
    
      override def receive: Receive = {
    
        case someProto @ Some(proto:ProtoMGOContext) =>
          val ctx = MGOContext.fromProto(proto)
          log.info(s"****** received MGOContext: $someProto *********")
    
          val task = mgoUpdate[Completed](ctx).toTask
          task.runOnComplete {
            case Success(s) => println("operations completed successfully.")
            case Failure(exception) => println(s"error: ${exception.getMessage}")
          }
    
      }
    }
    
    object MongoAdder {
    
      def create(port: Int): ActorSystem = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
          .withFallback(ConfigFactory.load())
        val system = ActorSystem("ClusterSystem", config)
    
        val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")
        ClusterClientReceptionist(system).registerService(mongoAdder)
    
        val receptionist = ClusterClientReceptionist(system).underlying
        system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")
    
        system
    
      }
    
    }

    MongoAdder处于同一个集群ClusterSystem中。代码里已经包括了服务注册部分。客户端发送MongoDB指令的示范如下:

     //MongoDB 操作示范
      import org.mongodb.scala._
      import sdp.mongo.engine.MGOClasses._
    
      val ctx = MGOContext("testdb","friends")
    
      val chen = Document("" -> "", "" -> "大文","age" -> 28)
      val zhang = Document("" -> "", "" -> "小海","age" -> 7)
      val lee = Document("" -> "", "" -> "","age" -> 45)
      val ouyang = Document("" -> "欧阳", "" -> "","age" -> 120)
    
      val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
      clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)

    由于MongoDB指令是通过protobuffer方式进行序列化的,所以需要修改client.conf通知akka使用protobuf格式的消息:

    akka {
    
      actor {
        provider = remote
        serializers {
          java = "akka.serialization.JavaSerializer"
          proto = "akka.remote.serialization.ProtobufSerializer"
        }
        serialization-bindings {
          "java.lang.String" = java
          "scalapb.GeneratedMessage" = proto
        }
      }
      remote.netty.tcp.port= 2553
      remote.netty.tcp.hostname=127.0.0.1
    
    }
    
    contact-points = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    下面是本次讨论完整示范源代码:

    build.sbt

    import scalapb.compiler.Version.scalapbVersion
    import scalapb.compiler.Version.grpcJavaVersion
    
    name := "akka-cluster-client"
    
    version := "0.1"
    
    scalaVersion := "2.12.7"
    
    scalacOptions += "-Ypartial-unification"
    
    libraryDependencies := Seq(
      "com.typesafe.akka" %% "akka-actor" % "2.5.17",
      "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.17",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
      // "io.grpc" % "grpc-netty" % grpcJavaVersion,
      "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
      "io.monix" %% "monix" % "2.3.0",
      //for mongodb 4.0
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0",
      "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20",
      //other dependencies
      "co.fs2" %% "fs2-core" % "0.9.7",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
      "org.typelevel" %% "cats-core" % "0.9.0",
      "io.monix" %% "monix-execution" % "3.0.0-RC1",
      "io.monix" %% "monix-eval" % "3.0.0-RC1"
    )
    
    PB.targets in Compile := Seq(
      scalapb.gen() -> (sourceManaged in Compile).value
    )

    project/scalapb.sbt

    addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
    
    libraryDependencies ++= Seq(
      "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4"
    )

    resouces/application.conf

    akka.actor.warn-about-java-serializer-usage = off
    akka.log-dead-letters-during-shutdown = off
    akka.log-dead-letters = off
    
    akka {
      loglevel = INFO
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
      actor {
        provider = "cluster"
        serializers {
          java = "akka.serialization.JavaSerializer"
          proto = "akka.remote.serialization.ProtobufSerializer"
        }
        serialization-bindings {
          "java.lang.String" = java
          "scalapb.GeneratedMessage" = proto
        }
      }
    
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551"]
        log-info = off
      }
    }

    resources/client.conf

    akka {
    
      actor {
        provider = remote
        serializers {
          java = "akka.serialization.JavaSerializer"
          proto = "akka.remote.serialization.ProtobufSerializer"
        }
        serialization-bindings {
          "java.lang.String" = java
          "scalapb.GeneratedMessage" = proto
        }
      }
      remote.netty.tcp.port= 2553
      remote.netty.tcp.hostname=127.0.0.1
    
    }
    
    contact-points = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    protobuf/spd.proto

    syntax = "proto3";
    
    import "google/protobuf/wrappers.proto";
    import "google/protobuf/any.proto";
    import "scalapb/scalapb.proto";
    
    option (scalapb.options) = {
      // use a custom Scala package name
      // package_name: "io.ontherocks.introgrpc.demo"
    
      // don't append file name to package
      flat_package: true
    
      // generate one Scala file for all messages (services still get their own file)
      single_file: true
    
      // add imports to generated file
      // useful when extending traits or using custom types
      // import: "io.ontherocks.hellogrpc.RockingMessage"
    
      // code to put at the top of generated file
      // works only with `single_file: true`
      //preamble: "sealed trait SomeSealedTrait"
    };
    
    package sdp.grpc.services;
    
    
    message ProtoDate {
      int32 yyyy = 1;
      int32 mm   = 2;
      int32 dd   = 3;
    }
    
    message ProtoTime {
      int32 hh   = 1;
      int32 mm   = 2;
      int32 ss   = 3;
      int32 nnn  = 4;
    }
    
    message ProtoDateTime {
       ProtoDate date = 1;
       ProtoTime time = 2;
    }
    
    message ProtoAny {
      bytes value = 1;
    }

    protobuf/mgo.proto

    syntax = "proto3";
    
    import "google/protobuf/wrappers.proto";
    import "google/protobuf/any.proto";
    import "scalapb/scalapb.proto";
    
    
    option (scalapb.options) = {
      // use a custom Scala package name
      // package_name: "io.ontherocks.introgrpc.demo"
    
      // don't append file name to package
      flat_package: true
    
      // generate one Scala file for all messages (services still get their own file)
      single_file: true
    
      // add imports to generated file
      // useful when extending traits or using custom types
      // import: "io.ontherocks.hellogrpc.RockingMessage"
    
      // code to put at the top of generated file
      // works only with `single_file: true`
      //preamble: "sealed trait SomeSealedTrait"
    };
    
    /*
     * Demoes various customization options provided by ScalaPBs.
     */
    
    package sdp.grpc.services;
    
    import "sdp.proto";
    
    message ProtoMGOBson {
      bytes bson = 1;
    }
    
    message ProtoMGODocument {
      bytes document = 1;
    }
    
    message ProtoMGOResultOption { //FindObservable
       int32 optType = 1;
       ProtoMGOBson bsonParam = 2;
       int32 valueParam = 3;
    }
    
    message ProtoMGOAdmin{
      string tarName = 1;
      repeated ProtoMGOBson bsonParam  = 2;
      ProtoAny options = 3;
      string objName = 4;
    }
    
    message ProtoMGOContext {  //MGOContext
      string dbName = 1;
      string collName = 2;
      int32 commandType = 3;
      repeated ProtoMGOBson bsonParam = 4;
      repeated ProtoMGOResultOption resultOptions = 5;
      repeated string targets = 6;
      ProtoAny options = 7;
      repeated ProtoMGODocument documents = 8;
      google.protobuf.BoolValue only = 9;
      ProtoMGOAdmin adminOptions = 10;
    }

    converters/ByteConverter.scala

    package protobuf.bytes
    import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
    import com.google.protobuf.ByteString
    object Converter {
    
      def marshal(value: Any): ByteString = {
        val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
        val oos = new ObjectOutputStream(stream)
        oos.writeObject(value)
        oos.close()
        ByteString.copyFrom(stream.toByteArray())
      }
    
      def unmarshal[A](bytes: ByteString): A = {
        val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
        val value = ois.readObject()
        ois.close()
        value.asInstanceOf[A]
      }
    
    
    }

    converters/DBOResultType.scala

    package sdp.result
    
    import cats._
    import cats.data.EitherT
    import cats.data.OptionT
    import monix.eval.Task
    import cats.implicits._
    
    import scala.concurrent._
    
    import scala.collection.TraversableOnce
    
    object DBOResult {
    
    
      type DBOError[A] = EitherT[Task,Throwable,A]
      type DBOResult[A] = OptionT[DBOError,A]
    
      implicit def valueToDBOResult[A](a: A): DBOResult[A] =
             Applicative[DBOResult].pure(a)
      implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
             OptionT((o: Option[A]).pure[DBOError])
      implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
     //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
             OptionT.liftF(EitherT.fromEither[Task](e))
      }
      implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
           val task = Task.fromFuture[A](fut)
           val et = EitherT.liftF[Task,Throwable,A](task)
           OptionT.liftF(et)
      }
    
      implicit class DBOResultToTask[A](r: DBOResult[A]) {
        def toTask = r.value.value
      }
    
      implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
        def someValue: Option[A] = r match {
          case Left(err) => (None: Option[A])
          case Right(oa) => oa
        }
      }
    
      def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
        if (coll.isEmpty)
          optionToDBOResult(None: Option[C[A]])
        else
          optionToDBOResult(Some(coll): Option[C[A]])
    }

    filestream/FileStreaming.scala

    package sdp.file
    
    import java.io.{ByteArrayInputStream, InputStream}
    import java.nio.ByteBuffer
    import java.nio.file.Paths
    
    import akka.stream.Materializer
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    import akka.util._
    
    import scala.concurrent.Await
    import scala.concurrent.duration._
    
    object Streaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
    }

    logging/Log.scala

    package sdp.logging
    
    import org.slf4j.Logger
    
    /**
      * Logger which just wraps org.slf4j.Logger internally.
      *
      * @param logger logger
      */
    class Log(logger: Logger) {
    
      // use var consciously to enable squeezing later
      var isDebugEnabled: Boolean = logger.isDebugEnabled
      var isInfoEnabled: Boolean = logger.isInfoEnabled
      var isWarnEnabled: Boolean = logger.isWarnEnabled
      var isErrorEnabled: Boolean = logger.isErrorEnabled
    
      def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
        level match {
          case 'debug | 'DEBUG => debug(msg)
          case 'info | 'INFO => info(msg)
          case 'warn | 'WARN => warn(msg)
          case 'error | 'ERROR => error(msg)
          case _ => // nothing to do
        }
      }
    
      def debug(msg: => String): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg)
        }
      }
    
      def debug(msg: => String, e: Throwable): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg, e)
        }
      }
    
      def info(msg: => String): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg)
        }
      }
    
      def info(msg: => String, e: Throwable): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg, e)
        }
      }
    
      def warn(msg: => String): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg)
        }
      }
    
      def warn(msg: => String, e: Throwable): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg, e)
        }
      }
    
      def error(msg: => String): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg)
        }
      }
    
      def error(msg: => String, e: Throwable): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg, e)
        }
      }
    
    }

    logging/LogSupport.scala

    package sdp.logging
    
    import org.slf4j.LoggerFactory
    
    trait LogSupport {
    
      /**
        * Logger
        */
      protected val log = new Log(LoggerFactory.getLogger(this.getClass))
    
    }

    mgo/engine/MGOProtoConversion.scala

    package sdp.mongo.engine
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.bson.conversions.Bson
    import sdp.grpc.services._
    import protobuf.bytes.Converter._
    import MGOClasses._
    import MGOAdmins._
    import MGOCommands._
    import org.bson.BsonDocument
    import org.bson.codecs.configuration.CodecRegistry
    import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
    import org.mongodb.scala.FindObservable
    
    object MGOProtoConversion {
    
      type MGO_COMMAND_TYPE = Int
      val MGO_COMMAND_FIND            = 0
      val MGO_COMMAND_COUNT           = 20
      val MGO_COMMAND_DISTICT         = 21
      val MGO_COMMAND_DOCUMENTSTREAM  = 1
      val MGO_COMMAND_AGGREGATE       = 2
      val MGO_COMMAND_INSERT          = 3
      val MGO_COMMAND_DELETE          = 4
      val MGO_COMMAND_REPLACE         = 5
      val MGO_COMMAND_UPDATE          = 6
    
    
      val MGO_ADMIN_DROPCOLLECTION    = 8
      val MGO_ADMIN_CREATECOLLECTION  = 9
      val MGO_ADMIN_LISTCOLLECTION    = 10
      val MGO_ADMIN_CREATEVIEW        = 11
      val MGO_ADMIN_CREATEINDEX       = 12
      val MGO_ADMIN_DROPINDEXBYNAME   = 13
      val MGO_ADMIN_DROPINDEXBYKEY    = 14
      val MGO_ADMIN_DROPALLINDEXES    = 15
    
    
      case class AdminContext(
                               tarName: String = "",
                               bsonParam: Seq[Bson] = Nil,
                               options: Option[Any] = None,
                               objName: String = ""
                             ){
        def toProto = sdp.grpc.services.ProtoMGOAdmin(
          tarName = this.tarName,
          bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
          objName = this.objName,
          options = this.options.map(b => ProtoAny(marshal(b)))
    
        )
      }
    
      object AdminContext {
        def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
          tarName = msg.tarName,
          bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
          objName = msg.objName,
          options = msg.options.map(b => unmarshal[Any](b.value))
        )
      }
    
      case class Context(
                          dbName: String = "",
                          collName: String = "",
                          commandType: MGO_COMMAND_TYPE,
                          bsonParam: Seq[Bson] = Nil,
                          resultOptions: Seq[ResultOptions] = Nil,
                          options: Option[Any] = None,
                          documents: Seq[Document] = Nil,
                          targets: Seq[String] = Nil,
                          only: Boolean = false,
                          adminOptions: Option[AdminContext] = None
                        ){
    
        def toProto = new sdp.grpc.services.ProtoMGOContext(
          dbName = this.dbName,
          collName = this.collName,
          commandType = this.commandType,
          bsonParam = this.bsonParam.map(bsonToProto),
          resultOptions = this.resultOptions.map(_.toProto),
          options = { if(this.options == None)
            None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else
            Some(ProtoAny(marshal(this.options.get))) },
          documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
          targets = this.targets,
          only = Some(this.only),
          adminOptions = this.adminOptions.map(_.toProto)
        )
    
      }
    
      object MGODocument {
        def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
          unmarshal[Document](msg.document)
        def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
          new ProtoMGODocument(marshal(doc))
      }
    
      object MGOProtoMsg {
        def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
          dbName = msg.dbName,
          collName = msg.collName,
          commandType = msg.commandType,
          bsonParam = msg.bsonParam.map(protoToBson),
          resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
          options = msg.options.map(a => unmarshal[Any](a.value)),
          documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
          targets = msg.targets,
          adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
        )
      }
    
      def bsonToProto(bson: Bson) =
        ProtoMGOBson(marshal(bson.toBsonDocument(
          classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
    
      def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
        val bsdoc = unmarshal[BsonDocument](proto.bson)
        override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
      }
    
      def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
        case MGO_COMMAND_FIND => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Find())
          )
          def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
            rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
    
          (proto.bsonParam, proto.resultOptions, proto.only) match {
            case (Nil, Nil, None) => ctx
            case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
            case (bp,Nil,None) => ctx.setCommand(
              Find(filter = Some(protoToBson(bp.head))))
            case (bp,Nil,Some(b)) => ctx.setCommand(
              Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
            case (bp,fo,None) => {
              ctx.setCommand(
                Find(filter = Some(protoToBson(bp.head)),
                  andThen = fo.map(ResultOptions.fromProto)
                ))
            }
            case (bp,fo,Some(b)) => {
              ctx.setCommand(
                Find(filter = Some(protoToBson(bp.head)),
                  andThen = fo.map(ResultOptions.fromProto),
                  firstOnly = b))
            }
            case _ => ctx
          }
        }
        case MGO_COMMAND_COUNT => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Count())
          )
          (proto.bsonParam, proto.options) match {
            case (Nil, None) => ctx
            case (bp, None) => ctx.setCommand(
              Count(filter = Some(protoToBson(bp.head)))
            )
            case (Nil,Some(o)) => ctx.setCommand(
              Count(options = Some(unmarshal[Any](o.value)))
            )
            case _ => ctx
          }
        }
        case MGO_COMMAND_DISTICT => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Distict(fieldName = proto.targets.head))
          )
          (proto.bsonParam) match {
            case Nil => ctx
            case bp: Seq[ProtoMGOBson] => ctx.setCommand(
              Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
            )
            case _ => ctx
          }
        }
        case MGO_COMMAND_AGGREGATE => {
          new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
          )
        }
        case MGO_ADMIN_LISTCOLLECTION => {
          new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_QUERY,
            action = Some(ListCollection(proto.dbName)))
        }
        case MGO_COMMAND_INSERT => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_UPDATE,
            action = Some(Insert(
              newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(Insert(
              newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
        case MGO_COMMAND_DELETE => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_UPDATE,
            action = Some(Delete(
              filter = protoToBson(proto.bsonParam.head)))
          )
          (proto.options, proto.only) match {
            case (None,None) => ctx
            case (None,Some(b)) => ctx.setCommand(Delete(
              filter = protoToBson(proto.bsonParam.head),
              onlyOne = b))
            case (Some(o),None) => ctx.setCommand(Delete(
              filter = protoToBson(proto.bsonParam.head),
              options = Some(unmarshal[Any](o.value)))
            )
            case (Some(o),Some(b)) => ctx.setCommand(Delete(
              filter = protoToBson(proto.bsonParam.head),
              options = Some(unmarshal[Any](o.value)),
              onlyOne = b)
            )
          }
        }
        case MGO_COMMAND_REPLACE => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_UPDATE,
            action = Some(Replace(
              filter = protoToBson(proto.bsonParam.head),
              replacement = unmarshal[Document](proto.documents.head.document)))
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(Replace(
              filter = protoToBson(proto.bsonParam.head),
              replacement = unmarshal[Document](proto.documents.head.document),
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
        case MGO_COMMAND_UPDATE => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_UPDATE,
            action = Some(Update(
              filter = protoToBson(proto.bsonParam.head),
              update = protoToBson(proto.bsonParam.tail.head)))
          )
          (proto.options, proto.only) match {
            case (None,None) => ctx
            case (None,Some(b)) => ctx.setCommand(Update(
              filter = protoToBson(proto.bsonParam.head),
              update = protoToBson(proto.bsonParam.tail.head),
              onlyOne = b))
            case (Some(o),None) => ctx.setCommand(Update(
              filter = protoToBson(proto.bsonParam.head),
              update = protoToBson(proto.bsonParam.tail.head),
              options = Some(unmarshal[Any](o.value)))
            )
            case (Some(o),Some(b)) => ctx.setCommand(Update(
              filter = protoToBson(proto.bsonParam.head),
              update = protoToBson(proto.bsonParam.tail.head),
              options = Some(unmarshal[Any](o.value)),
              onlyOne = b)
            )
          }
        }
        case MGO_ADMIN_DROPCOLLECTION =>
          new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_ADMIN,
            action = Some(DropCollection(proto.collName))
          )
        case MGO_ADMIN_CREATECOLLECTION => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_ADMIN,
            action = Some(CreateCollection(proto.collName))
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
        case MGO_ADMIN_CREATEVIEW => {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_ADMIN,
            action = Some(CreateView(viewName = proto.targets.head,
              viewOn = proto.targets.tail.head,
              pipeline = proto.bsonParam.map(p => protoToBson(p))))
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
              viewOn = proto.targets.tail.head,
              pipeline = proto.bsonParam.map(p => protoToBson(p)),
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
        case MGO_ADMIN_CREATEINDEX=> {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_ADMIN,
            action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
        case MGO_ADMIN_DROPINDEXBYNAME=> {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_ADMIN,
            action = Some(DropIndexByName(indexName = proto.targets.head))
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
        case MGO_ADMIN_DROPINDEXBYKEY=> {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_ADMIN,
            action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
        case MGO_ADMIN_DROPALLINDEXES=> {
          var ctx = new MGOContext(
            dbName = proto.dbName,
            collName = proto.collName,
            actionType = MGO_ADMIN,
            action = Some(DropAllIndexes())
          )
          proto.options match {
            case None => ctx
            case Some(o) => ctx.setCommand(DropAllIndexes(
              options = Some(unmarshal[Any](o.value)))
            )
          }
        }
    
      }
    
      def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
        case None => None
        case Some(act) => act match {
          case Count(filter, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_COUNT,
              bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                            else Seq(bsonToProto(filter.get))},
              options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
                          else Some(ProtoAny(marshal(options.get))) }
          ))
          case Distict(fieldName, filter) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_DISTICT,
              bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                            else Seq(bsonToProto(filter.get))},
              targets = Seq(fieldName)
    
            ))
    
          case Find(filter, andThen, firstOnly) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_FIND,
              bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
              else Seq(bsonToProto(filter.get))},
              resultOptions = andThen.map(_.toProto)
            ))
    
          case Aggregate(pipeLine) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_AGGREGATE,
              bsonParam = pipeLine.map(bsonToProto)
            ))
    
          case Insert(newdocs, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_INSERT,
              documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
              options = { if(options == None) None      //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) }
            ))
    
          case Delete(filter, options, onlyOne) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_DELETE,
              bsonParam = Seq(bsonToProto(filter)),
              options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) },
              only = Some(onlyOne)
            ))
    
          case Replace(filter, replacement, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_REPLACE,
              bsonParam = Seq(bsonToProto(filter)),
              options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) },
              documents = Seq(ProtoMGODocument(marshal(replacement)))
            ))
    
          case Update(filter, update, options, onlyOne) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_COMMAND_UPDATE,
              bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
              options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) },
              only = Some(onlyOne)
            ))
    
    
          case DropCollection(coll) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = coll,
              commandType = MGO_ADMIN_DROPCOLLECTION
            ))
    
          case CreateCollection(coll, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = coll,
              commandType = MGO_ADMIN_CREATECOLLECTION,
              options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) }
            ))
    
          case ListCollection(dbName) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              commandType = MGO_ADMIN_LISTCOLLECTION
            ))
    
          case CreateView(viewName, viewOn, pipeline, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_ADMIN_CREATEVIEW,
              bsonParam = pipeline.map(bsonToProto),
              options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) },
              targets = Seq(viewName,viewOn)
            ))
    
          case CreateIndex(key, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_ADMIN_CREATEINDEX,
              bsonParam = Seq(bsonToProto(key)),
              options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) }
            ))
    
    
          case DropIndexByName(indexName, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_ADMIN_DROPINDEXBYNAME,
              targets = Seq(indexName),
              options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) }
            ))
    
          case DropIndexByKey(key, options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_ADMIN_DROPINDEXBYKEY,
              bsonParam = Seq(bsonToProto(key)),
              options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) }
            ))
    
    
          case DropAllIndexes(options) =>
            Some(new sdp.grpc.services.ProtoMGOContext(
              dbName = ctx.dbName,
              collName = ctx.collName,
              commandType = MGO_ADMIN_DROPALLINDEXES,
              options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
              else Some(ProtoAny(marshal(options.get))) }
            ))
    
        }
      }
    
    }

    mgo/engine/MongoDBEngine.scala

    package sdp.mongo.engine
    
    import java.text.SimpleDateFormat
    import java.util.Calendar
    
    import akka.NotUsed
    import akka.stream.Materializer
    import akka.stream.alpakka.mongodb.scaladsl._
    import akka.stream.scaladsl.{Flow, Source}
    import org.bson.conversions.Bson
    import org.mongodb.scala.bson.collection.immutable.Document
    import org.mongodb.scala.bson.{BsonArray, BsonBinary}
    import org.mongodb.scala.model._
    import org.mongodb.scala.{MongoClient, _}
    import protobuf.bytes.Converter._
    import sdp.file.Streaming._
    import sdp.logging.LogSupport
    
    import scala.collection.JavaConverters._
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object MGOClasses {
      type MGO_ACTION_TYPE = Int
      val MGO_QUERY        = 0
      val MGO_UPDATE       = 1
      val MGO_ADMIN        = 2
    
      /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
    
      type FOD_TYPE       = Int
      val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item
      val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]
      val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]
      val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]
      val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]
      //Sets a document describing the fields to return for all matching documents
      val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]
      val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]
      //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
      val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]
      //Sets the cursor type
      val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]
      //Sets the hint for which index to use. A null value means no hint is set
      val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]
      //Sets the exclusive upper bound for a specific index. A null value means no max is set
      val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]
      //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
      val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
      //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
      val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
      //Sets the showRecordId. Set to true to add a field `$recordId` to the returned documents
    
      case class ResultOptions(
                                optType: FOD_TYPE,
                                bson: Option[Bson] = None,
                                value: Int = 0 ){
        def toProto = new sdp.grpc.services.ProtoMGOResultOption(
          optType = this.optType,
          bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
          valueParam = this.value
        )
        def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
          optType match {
            case  FOD_FIRST        => find
            case  FOD_FILTER       => find.filter(bson.get)
            case  FOD_LIMIT        => find.limit(value)
            case  FOD_SKIP         => find.skip(value)
            case  FOD_PROJECTION   => find.projection(bson.get)
            case  FOD_SORT         => find.sort(bson.get)
            case  FOD_PARTIAL      => find.partial(value != 0)
            case  FOD_CURSORTYPE   => find
            case  FOD_HINT         => find.hint(bson.get)
            case  FOD_MAX          => find.max(bson.get)
            case  FOD_MIN          => find.min(bson.get)
            case  FOD_RETURNKEY    => find.returnKey(value != 0)
            case  FOD_SHOWRECORDID => find.showRecordId(value != 0)
    
          }
        }
      }
      object ResultOptions {
        def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(
          optType = msg.optType,
          bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
          value = msg.valueParam
        )
    
      }
    
      trait MGOCommands
    
      object MGOCommands {
    
        case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
    
        case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
    
        /*  org.mongodb.scala.FindObservable
        import com.mongodb.async.client.FindIterable
        val resultDocType = FindIterable[Document]
        val resultOption = FindObservable(resultDocType)
          .maxScan(...)
        .limit(...)
        .sort(...)
        .project(...) */
        case class Find(filter: Option[Bson] = None,
                           andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
                           firstOnly: Boolean = false) extends MGOCommands
    
        case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
    
        case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
    
        case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
    
        case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
        case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
    
        case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
    
    
        case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
    
      }
    
      object MGOAdmins {
    
        case class DropCollection(collName: String) extends MGOCommands
    
        case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
    
        case class ListCollection(dbName: String) extends MGOCommands
    
        case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
    
        case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
    
        case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
    
        case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
    
      }
    
      case class MGOContext(
                             dbName: String,
                             collName: String,
                             actionType: MGO_ACTION_TYPE = MGO_QUERY,
                             action: Option[MGOCommands] = None,
                             actionOptions: Option[Any] = None,
                             actionTargets: Seq[String] = Nil
                           ) {
        ctx =>
        def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
    
        def setCollName(name: String): MGOContext = ctx.copy(collName = name)
    
        def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
    
        def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))
    
        def toSomeProto = MGOProtoConversion.ctxToProto(this)
    
      }
    
      object MGOContext {
        def apply(db: String, coll: String) = new MGOContext(db, coll)
        def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
          MGOProtoConversion.ctxFromProto(proto)
      }
    
      case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
        ctxs =>
        def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
        def appendContext(ctx: MGOContext): MGOBatContext =
          ctxs.copy(contexts = contexts :+ ctx)
      }
    
      object MGOBatContext {
        def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
      }
    
      type MGODate = java.util.Date
      def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd)
        ca.getTime()
      }
      def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
        val ca = Calendar.getInstance()
        ca.set(yyyy,mm,dd,hr,min,sec)
        ca.getTime()
      }
      def mgoDateTimeNow: MGODate = {
        val ca = Calendar.getInstance()
        ca.getTime
      }
    
    
      def mgoDateToString(dt: MGODate, formatString: String): String = {
        val fmt= new SimpleDateFormat(formatString)
        fmt.format(dt)
      }
    
      type MGOBlob = BsonBinary
      type MGOArray = BsonArray
    
      def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
    
      def mgoBlobToFile(blob: MGOBlob, fileName: String)(
        implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)
    
      def mgoGetStringOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getString(fieldName))
        else None
      }
      def mgoGetIntOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getInteger(fieldName))
        else None
      }
      def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getLong(fieldName))
        else None
      }
      def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDouble(fieldName))
        else None
      }
      def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getBoolean(fieldName))
        else None
      }
      def mgoGetDateOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          Some(doc.getDate(fieldName))
        else None
      }
      def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
        else None
      }
      def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
        if (doc.keySet.contains(fieldName))
          doc.get(fieldName).asInstanceOf[Option[MGOArray]]
        else None
      }
    
      def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
        (arr.getValues.asScala.toList)
          .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
      }
    
      type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
    }
    
    object MGOEngine extends LogSupport {
    
      import MGOClasses._
      import MGOAdmins._
      import MGOCommands._
      import sdp.result.DBOResult._
    
    
      object TxUpdateMode {
        private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
                  implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
          log.info(s"mgoTxUpdate> calling ...")
          observable.map(clientSession => {
    
            val transactionOptions =
              TransactionOptions.builder()
                .readConcern(ReadConcern.SNAPSHOT)
                .writeConcern(WriteConcern.MAJORITY).build()
    
            clientSession.startTransaction(transactionOptions)
    /*
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
            Await.ready(fut, 3 seconds) */
    
            ctxs.contexts.foreach { ctx =>
              mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
            }
            clientSession
          })
        }
    
        private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"commitAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
              log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
              commitAndRetry(observable)
            }
            case e: Exception => {
              log.error(s"commitAndRetry> Exception during commit ...: $e")
              throw e
            }
          })
        }
    
        private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
          log.info(s"runTransactionAndRetry> calling ...")
          observable.recoverWith({
            case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
              log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
              runTransactionAndRetry(observable)
            }
          })
        }
    
        def mgoTxBatch(ctxs: MGOBatContext)(
                implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
    
          log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")
    
          val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
          val commitTransactionObservable: SingleObservable[Completed] =
            updateObservable.flatMap(clientSession => clientSession.commitTransaction())
          val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
    
          runTransactionAndRetry(commitAndRetryObservable)
    
          valueToDBOResult(Completed())
    
        }
      }
    
    
      def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
        log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")
        if (ctxs.tx) {
            TxUpdateMode.mgoTxBatch(ctxs)
          } else {
    /*
            val fut = Future.traverse(ctxs.contexts) { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
            Await.ready(fut, 3 seconds)
            Future.successful(new Completed) */
            ctxs.contexts.foreach { ctx =>
              mgoUpdate[Completed](ctx).map(identity) }
    
             valueToDBOResult(Completed())
          }
    
      }
    
      def mongoStream(ctx: MGOContext)(
        implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
        log.info(s"mongoStream>  MGOContext: ${ctx}")
    
        def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
          rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mongoStream> uery action cannot be null!")
          throw new IllegalArgumentException("query action cannot be null!")
        }
        try {
          ctx.action.get match {
            case Find(None, Nil, false) => //FindObservable
              MongoSource(coll.find())
            case Find(None, Nil, true) => //FindObservable
              MongoSource(coll.find().first())
            case Find(Some(filter), Nil, false) => //FindObservable
              MongoSource(coll.find(filter))
            case Find(Some(filter), Nil, true) => //FindObservable
              MongoSource(coll.find(filter).first())
            case Find(None, sro, _) => //FindObservable
              val next = toResultOption(sro)
              MongoSource(next(coll.find[Document]()))
            case Find(Some(filter), sro, _) => //FindObservable
              val next = toResultOption(sro)
              MongoSource(next(coll.find[Document](filter)))
            case _ =>
              log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
              throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
    
          }
        }
        catch { case e: Exception =>
          log.error(s"mongoStream> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
        }
    
      }
    
    
      // T => FindIterable  e.g List[Document]
      def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
        log.info(s"mgoQuery>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
    
        def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
          rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
    
    
        if ( ctx.action == None) {
          log.error(s"mgoQuery> uery action cannot be null!")
          Left(new IllegalArgumentException("query action cannot be null!"))
        }
        try {
          ctx.action.get match {
            /* count */
            case Count(Some(filter), Some(opt)) => //SingleObservable
              coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
                .toFuture().asInstanceOf[Future[T]]
            case Count(Some(filter), None) => //SingleObservable
              coll.countDocuments(filter).toFuture()
                .asInstanceOf[Future[T]]
            case Count(None, None) => //SingleObservable
              coll.countDocuments().toFuture()
                .asInstanceOf[Future[T]]
            /* distinct */
            case Distict(field, Some(filter)) => //DistinctObservable
              coll.distinct(field, filter).toFuture()
                .asInstanceOf[Future[T]]
            case Distict(field, None) => //DistinctObservable
              coll.distinct((field)).toFuture()
                .asInstanceOf[Future[T]]
            /* find */
            case Find(None, Nil, false) => //FindObservable
              if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
              else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(None, Nil, true) => //FindObservable
              if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
              else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
            case Find(Some(filter), Nil, false) => //FindObservable
              if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
              else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(Some(filter), Nil, true) => //FindObservable
              if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
              else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
            case Find(None, sro, _) => //FindObservable
              val next = toResultOption(sro)
              if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
              else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            case Find(Some(filter), sro, _) => //FindObservable
              val next = toResultOption(sro)
              if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
              else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
            /* aggregate AggregateObservable*/
            case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
            /* mapReduce MapReduceObservable*/
            case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
            /* list collection */
            case ListCollection(dbName) => //ListConllectionObservable
              client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
    
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoQuery> runtime error: ${e.getMessage}")
          Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
        }
      }
      //T => Completed, result.UpdateResult, result.DeleteResult
      def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
        try {
          mgoUpdateObservable[T](ctx).toFuture()
        }
        catch { case e: Exception =>
          log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
          Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
        }
    
      def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
        log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoUpdateObservable> uery action cannot be null!")
          throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
        }
        try {
          ctx.action.get match {
            /* insert */
            case Insert(docs, Some(opt)) => //SingleObservable[Completed]
              if (docs.size > 1)
                coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
              else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
            case Insert(docs, None) => //SingleObservable
              if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
              else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
            /* delete */
            case Delete(filter, None, onlyOne) => //SingleObservable
              if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
              else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
            case Delete(filter, Some(opt), onlyOne) => //SingleObservable
              if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
              else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
            /* replace */
            case Replace(filter, replacement, None) => //SingleObservable
              coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
            case Replace(filter, replacement, Some(opt)) => //SingleObservable
              coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
            /* update */
            case Update(filter, update, None, onlyOne) => //SingleObservable
              if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
              else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
            case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
              if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
              else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
            /* bulkWrite */
            case BulkWrite(commands, None) => //SingleObservable
              coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
            case BulkWrite(commands, Some(opt)) => //SingleObservable
              coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
        }
      }
    
      def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
        log.info(s"mgoAdmin>  MGOContext: ${ctx}")
    
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        if ( ctx.action == None) {
          log.error(s"mgoAdmin> uery action cannot be null!")
          Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
        }
        try {
          ctx.action.get match {
            /* drop collection */
            case DropCollection(collName) => //SingleObservable
              val coll = db.getCollection(collName)
              coll.drop().toFuture()
            /* create collection */
            case CreateCollection(collName, None) => //SingleObservable
              db.createCollection(collName).toFuture()
            case CreateCollection(collName, Some(opt)) => //SingleObservable
              db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
            /* list collection
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
            */
            /* create view */
            case CreateView(viewName, viewOn, pline, None) => //SingleObservable
              db.createView(viewName, viewOn, pline).toFuture()
            case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
              db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
            /* create index */
            case CreateIndex(key, None) => //SingleObservable
              coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] //   asInstanceOf[SingleObservable[Completed]]
            case CreateIndex(key, Some(opt)) => //SingleObservable
              coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
            /* drop index */
            case DropIndexByName(indexName, None) => //SingleObservable
              coll.dropIndex(indexName).toFuture()
            case DropIndexByName(indexName, Some(opt)) => //SingleObservable
              coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
            case DropIndexByKey(key, None) => //SingleObservable
              coll.dropIndex(key).toFuture()
            case DropIndexByKey(key, Some(opt)) => //SingleObservable
              coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
            case DropAllIndexes(None) => //SingleObservable
              coll.dropIndexes().toFuture()
            case DropAllIndexes(Some(opt)) => //SingleObservable
              coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
          }
        }
        catch { case e: Exception =>
          log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
          throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
        }
    
      }
    
    /*
        def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
        val db = client.getDatabase(ctx.dbName)
        val coll = db.getCollection(ctx.collName)
        ctx.action match {
          /* count */
          case Count(Some(filter), Some(opt)) =>   //SingleObservable
            coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
              .toFuture().asInstanceOf[Future[T]]
          case Count(Some(filter), None) =>        //SingleObservable
            coll.countDocuments(filter).toFuture()
              .asInstanceOf[Future[T]]
          case Count(None, None) =>                //SingleObservable
            coll.countDocuments().toFuture()
              .asInstanceOf[Future[T]]
          /* distinct */
          case Distict(field, Some(filter)) =>     //DistinctObservable
            coll.distinct(field, filter).toFuture()
              .asInstanceOf[Future[T]]
          case Distict(field, None) =>             //DistinctObservable
            coll.distinct((field)).toFuture()
              .asInstanceOf[Future[T]]
          /* find */
          case Find(None, None, optConv, false) =>  //FindObservable
            if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
            else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(None, None, optConv, true) =>   //FindObservable
            if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
            else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, false) =>   //FindObservable
            if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
            else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), None, optConv, true) =>   //FindObservable
            if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
            else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
          case Find(None, Some(next), optConv, _) =>   //FindObservable
            if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          case Find(Some(filter), Some(next), optConv, _) =>  //FindObservable
            if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
            else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
          /* aggregate AggregateObservable*/
          case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
          /* mapReduce MapReduceObservable*/
          case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
          /* insert */
          case Insert(docs, Some(opt)) =>                  //SingleObservable[Completed]
            if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
              .asInstanceOf[Future[T]]
            else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
              .asInstanceOf[Future[T]]
          case Insert(docs, None) =>                       //SingleObservable
            if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
            else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
          /* delete */
          case Delete(filter, None, onlyOne) =>            //SingleObservable
            if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
          case Delete(filter, Some(opt), onlyOne) =>       //SingleObservable
            if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
          /* replace */
          case Replace(filter, replacement, None) =>        //SingleObservable
            coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
          case Replace(filter, replacement, Some(opt)) =>    //SingleObservable
            coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* update */
          case Update(filter, update, None, onlyOne) =>      //SingleObservable
            if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
          case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
            if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
            else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
          /* bulkWrite */
          case BulkWrite(commands, None) =>                  //SingleObservable
            coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
          case BulkWrite(commands, Some(opt)) =>             //SingleObservable
            coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
    
          /* drop collection */
          case DropCollection(collName) =>                   //SingleObservable
            val coll = db.getCollection(collName)
            coll.drop().toFuture().asInstanceOf[Future[T]]
          /* create collection */
          case CreateCollection(collName, None) =>           //SingleObservable
            db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
          case CreateCollection(collName, Some(opt)) =>      //SingleObservable
            db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
          /* list collection */
          case ListCollection(dbName) =>   //ListConllectionObservable
            client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
          /* create view */
          case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable
            db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
          case CreateView(viewName, viewOn, pline, Some(opt)) =>  //SingleObservable
            db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
          /* create index */
          case CreateIndex(key, None) =>                     //SingleObservable
            coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
          case CreateIndex(key, Some(opt)) =>                //SingleObservable
            coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
          /* drop index */
          case DropIndexByName(indexName, None) =>           //SingleObservable
            coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
          case DropIndexByName(indexName, Some(opt)) =>      //SingleObservable
            coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, None) =>                  //SingleObservable
            coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
          case DropIndexByKey(key, Some(opt)) =>             //SingleObservable
            coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(None) =>                       //SingleObservable
            coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
          case DropAllIndexes(Some(opt)) =>                  //SingleObservable
            coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
        }
      }
    */
    
    
    }
    
    
    object MongoActionStream {
    
      import MGOClasses._
    
      case class StreamingInsert[A](dbName: String,
                                    collName: String,
                                    converter: A => Document,
                                    parallelism: Int = 1
                                   ) extends MGOCommands
    
      case class StreamingDelete[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
      case class StreamingUpdate[A](dbName: String,
                                    collName: String,
                                    toFilter: A => Bson,
                                    toUpdate: A => Bson,
                                    parallelism: Int = 1,
                                    justOne: Boolean = false
                                   ) extends MGOCommands
    
    
      case class InsertAction[A](ctx: StreamingInsert[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
          Flow[A].map(ctx.converter)
            .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
      }
    
      case class UpdateAction[A](ctx: StreamingUpdate[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
      }
    
    
      case class DeleteAction[A](ctx: StreamingDelete[A])(
        implicit mongoClient: MongoClient) {
    
        val database = mongoClient.getDatabase(ctx.dbName)
        val collection = database.getCollection(ctx.collName)
    
        def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
          if (ctx.justOne) {
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
          } else
            Flow[A]
              .mapAsync(ctx.parallelism)(a =>
                collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
      }
    
    }
    
    object MGOHelpers {
    
      implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
        override val converter: (Document) => String = (doc) => doc.toJson
      }
    
      implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
        override val converter: (C) => String = (doc) => doc.toString
      }
    
      trait ImplicitObservable[C] {
        val observable: Observable[C]
        val converter: (C) => String
    
        def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    
        def headResult() = Await.result(observable.head(), 10 seconds)
    
        def printResults(initial: String = ""): Unit = {
          if (initial.length > 0) print(initial)
          results().foreach(res => println(converter(res)))
        }
    
        def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
      }
    
      def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
        Await.result(fut, timeOut)
      }
    
      def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
        Await.result(fut, timeOut)
      }
    
      import monix.eval.Task
      import monix.execution.Scheduler.Implicits.global
    
      final class FutureToTask[A](x: => Future[A]) {
        def asTask: Task[A] = Task.deferFuture[A](x)
      }
    
      final class TaskToFuture[A](x: => Task[A]) {
        def asFuture: Future[A] = x.runAsync
      }
    
    }

    PetSound.scala

    package petsound
    import akka.actor._
    import akka.cluster.client._
    import com.typesafe.config.ConfigFactory
    import akka.cluster.pubsub.DistributedPubSubMediator._
    import akka.cluster.pubsub._
    
    object Cat {
      def props = Props[Cat]
      def create(port: Int): ActorSystem  = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
          .withFallback(ConfigFactory.load())
    
        val system = ActorSystem("ClusterSystem",config)
        val catSound = system.actorOf(props,"CatSound")
    
        ClusterClientReceptionist(system).registerService(catSound)
    
        val receptionist = ClusterClientReceptionist(system).underlying
        system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")
    
        system
      }
    }
    
    object Dog {
      def props = Props(new Dog)
      def create(port: Int): ActorSystem = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
          .withFallback(ConfigFactory.load())
        val system = ActorSystem("ClusterSystem",config)
        val dogSound = system.actorOf(props,"DogSound")
        ClusterClientReceptionist(system).registerService(dogSound)
    
        val receptionist = ClusterClientReceptionist(system).underlying
        system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")
    
        system
      }
    }
    
    class Cat extends Actor with ActorLogging {
      //使用pub/sub方式设置
      val mediator = DistributedPubSub(context.system).mediator
      override def preStart() = {
        mediator ! Subscribe("Shout", self)
        super.preStart()
      }
    
      override def receive: Receive = {
        case "Shout" =>
          log.info("*******I am a cat, MIAOM ...******")
      }
    }
    
    class Dog extends Actor with ActorLogging {
      //使用pub/sub方式设置
      val mediator = DistributedPubSub(context.system).mediator
      override def preStart() = {
        mediator ! Subscribe("Shout", self)
        super.preStart()
      }
      override def receive: Receive = {
        case "Shout" =>
          log.info("*****I am a dog, WANG WANG...*****")
      }
    }

    EventListener.scala

    package petsound
    import akka.actor._
    import akka.cluster.client._
    class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
      override def preStart(): Unit = {
        clusterClient ! SubscribeContactPoints
        super.preStart()
      }
    
      override def receive: Receive = {
        case ContactPoints(cps) =>
          cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}
        case ContactPointAdded(cp) =>
          log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")
        case ContactPointRemoved(cp) =>
          log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")
    
      }
    }
    
    class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
      override def preStart(): Unit = {
        receptionist ! SubscribeClusterClients
        super.preStart()
      }
    
      override def receive: Receive = {
        case ClusterClients(cs) =>
          cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}
        case ClusterClientUp(cc) =>
          log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")
        case ClusterClientUnreachable(cc) =>
          log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")
    
      }
    }

    MongoAdder.scala

    package petsound
    import akka.actor._
    import com.typesafe.config._
    import akka.actor.ActorSystem
    import org.mongodb.scala._
    import sdp.grpc.services.ProtoMGOContext
    import sdp.mongo.engine.MGOClasses._
    import sdp.mongo.engine.MGOEngine._
    import sdp.result.DBOResult._
    import akka.cluster.client._
    
    import scala.collection.JavaConverters._
    import scala.util._
    
    class MongoAdder extends Actor with ActorLogging {
      import monix.execution.Scheduler.Implicits.global
      implicit val mgosys = context.system
      implicit val ec = mgosys.dispatcher
    
      val clientSettings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings {b =>
          b.hosts(List(new ServerAddress("localhost:27017")).asJava)
        }.build()
    
      implicit val client: MongoClient = MongoClient(clientSettings)
    
      val ctx = MGOContext("testdb","friends")
    
      override def receive: Receive = {
    
        case someProto @ Some(proto:ProtoMGOContext) =>
          val ctx = MGOContext.fromProto(proto)
          log.info(s"****** received MGOContext: $someProto *********")
    
          val task = mgoUpdate[Completed](ctx).toTask
          task.runOnComplete {
            case Success(s) => println("operations completed successfully.")
            case Failure(exception) => println(s"error: ${exception.getMessage}")
          }
    
      }
    }
    
    object MongoAdder {
    
      def create(port: Int): ActorSystem = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
          .withFallback(ConfigFactory.load())
        val system = ActorSystem("ClusterSystem", config)
    
        val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")
        ClusterClientReceptionist(system).registerService(mongoAdder)
    
        val receptionist = ClusterClientReceptionist(system).underlying
        system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")
    
        system
    
      }
    
    }

    PetHouse.scala

    package petsound
    import akka.actor._
    import akka.japi.Util.immutableSeq
    import akka.actor.AddressFromURIString
    import com.typesafe.config.ConfigFactory
    import akka.cluster.client._
    import akka.cluster.client.ClusterClient._
    
    object PetHouse extends App {
    
      val sysCat = Cat.create(2551)
      val sysDog = Dog.create(2552)
      val mongo = MongoAdder.create(2555)
    
      scala.io.StdIn.readLine()
    
      sysCat.terminate()
      sysDog.terminate()
      mongo.terminate()
    
    }
    
    
    object PetClient extends App {
    
      val conf = ConfigFactory.load("client")
      val clientSystem = ActorSystem("ClientSystem",conf)
    /* 从 conf 文件里读取 contact-points 地址
      val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
        case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist"
      }.toSet
    */
    
      //先放一个contact-point, 系统会自动增加其它的点
      val initialContacts = Set(
        ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")
      )
    
      val clusterClient = clientSystem.actorOf(
        ClusterClient.props(
          ClusterClientSettings(clientSystem)
            .withInitialContacts(initialContacts)), "petClient")
    
      clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")
    
      clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)
      clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)
    
      println(s"sent shout messages ...")
      scala.io.StdIn.readLine()
    
      clusterClient ! Publish("Shout","Shout")
      println(s"publish shout messages ...")
    
    
      //MongoDB 操作示范
      import org.mongodb.scala._
      import sdp.mongo.engine.MGOClasses._
    
      val ctx = MGOContext("testdb","friends")
    
      val chen = Document("" -> "", "" -> "大文","age" -> 28)
      val zhang = Document("" -> "", "" -> "小海","age" -> 7)
      val lee = Document("" -> "", "" -> "","age" -> 45)
      val ouyang = Document("" -> "欧阳", "" -> "","age" -> 120)
    
      val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
      clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)
    
    
      scala.io.StdIn.readLine()
      clientSystem.terminate()
    }
  • 相关阅读:
    Intellij IDEA 一些不为人知的技巧
    IDEA配置GIT
    返回数据
    IDEA字体设置
    @RequestParam
    @RequestMapping
    基于jquery fly插件实现加入购物车抛物线动画效果,jquery.fly.js
    js倒计时代码 适合于促销-倒计时代码
    phpstorm 10 注册码
    dispaly:table-cell,inline-block,阐述以及案例
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/10094659.html
Copyright © 2011-2022 走看看