zoukankan      html  css  js  c++  java
  • restapi(4)- rest-mongo : MongoDB数据库前端的httpserver

       完成了一套标准的rest风格数据库CRUD操作httpserver后发现有许多不足。主要是为了追求“通用”两个字,想把所有服务接口做的更“范generic”些,结果反而限制了目标数据库的特点,最终产生了一套功能弱小的玩具。比如说吧:标准rest风格getbyId需要所有的数据表都具备id这个字段,有点傻。然后get返回的结果集又没有什么灵活的控制方法如返回数量、字段、排序等。特别对MongoDB这样的在查询操作方面接近关系式数据库的分布式数据库:上篇提到过,它的query能力强大,条件组合灵活,如果不能在网络服务api中体现出来就太可惜了。所以,这篇博文会讨论一套专门针对MongoDB的rest-server。我想达到的目的是:后台数据库是MongoDB,通过httpserver提供对MongoDB的CRUD操作,客户端通过http调用CRUD服务。后台开发对每一个数据库表单使用统一的标准增添一套新的CRUD服务。希望如此能够提高开发效率,减少代码出错机会。

    MongoDB是一种文件类型数据库,数据格式更加多样化。在这次示范里希望能把MongoDB有特点的数据类型以及它们的处理方法都介绍了,包括:日期类型,二进制类型blob(图片)等。顺便提一下:普通大型文本文件也可以用二进制blob方式存入MongoDB,因为文件在http传输过程中必须以byte方式进行,所以后台httpserver接收的文件格式是一串byte,不用任何格式转换就可以直接存入MongoDB blob字段。客户端从后台下载时就需要把bytes转换成UTF8字符就可以恢复文件内容了。

    首先,我们先从Model开始,在scala里用case class来表示。Model是MongoDB Document的对应。在scala编程里我们是用case class 当作Document来操作的。我们设计的Model都会继承一个ModelBase trait:

    trait ModelBase[E] {
      def to: E
    }
    
    
      case class Person(
                       userid: String = "",
                       name: String = "",
                       age: Option[Int] = None,
                       dob: Option[MGODate] = None,   //生日
                       address: Option[String] = None
                       ) extends ModelBase[Document] {
        import org.mongodb.scala.bson._
    
        override def to: Document = {
          var doc = Document(
          "userid" -> this.userid,
          "name" -> this.name)
    
    
          if (this.age != None)
            doc = doc + ("age" -> this.age.get)
    
          if (this.dob != None)
            doc = doc + ("dob" -> this.dob.get)
    
          if (this.address != None)
            doc = doc + ("address" -> this.address.getOrElse(""))
    
          doc
        }
    
      }
      object Person {
        val fromDocument: Document => Person = doc => {
          val keyset = doc.keySet
          Person(
            userid = doc.getString("userid"),
            name = doc.getString("name"),
            age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],
    
            dob =  {if (keyset.contains("dob"))
              Some(doc.getDate("dob"))
            else None },
    
            address =  mgoGetStringOrNone(doc,"address")
          )
        }
      }

    在上面例子里Person对应MongoDB里一个Document。除了注意对应类型属性与表字段类型外,还提供了to,fromDecument两个转换函数。其中to函数是继承ModelBase的,代表所有MongoDB Model都必须具备to这个函数。这点很重要,因为在从json构建成Model时,如果属于ModelBase则肯定可以调用一个to函数:

     class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(
    ...
    
    
    post {
            entity(as[String]) { json =>
              val extractedEntity: M = fromJson[M](json)
              val doc: Document = extractedEntity.to
              val futmsg = repository.insert(doc).value.value.runToFuture.map {
                eoc =>
                  eoc match {
                    case Right(oc) => oc match {
                      case Some(c) => c.toString()
                      case None => "insert may not complete!"
                    }
                    case Left(err) => err.getMessage
                  }
              }

    注意这个extractedEntity:我们现在还不能确定它的具体类型,是Person,Animal,Machine? 但我们确定它是M类型,而M<:ModalBase[Document],所以M是MongoDB Model。可以调用extractedEntity.to获取一个Document。

    仔细看,Person里并不包括blob类型字段。因为到现在我还没有想到办法在一个httprequest里把多个字段和图片一次性发出来,必须分两个request才能完成一个Document的上传。httpserver收到两个requests后还要进行requests的匹配对应管理,十分的复杂。所以含blob类型的Document只能把blob分拆到另一个Document里,然后用这个Document唯一一个id字段来链接:

      case class Photo (
                         id: String,
                         photo: Option[MGOBlob]
                       ) extends ModelBase[Document] {
        override def to: Document = {
          var doc = Document("id" -> this.id)
          if (photo != None)
            doc = doc + ("photo" -> this.photo)
          doc
        }
      }
    
      object Photo {
        def fromDocument: Document => Photo = doc => {
          val keyset = doc.keySet
          Photo(
            id = doc.getString("id"),
            photo = mgoGetBlobOrNone(doc, "photo")
          )
        }
      }

    从另一个角度来讲,把blob和正常字段分开来存储也有一定的优势,最多也就是需要两次query罢了。

    第二部分是repository:数据库操作函数:

       class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {
        def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
          var res = Seq[ResultOptions]()
          next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
          sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
          fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
          top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
    
          val ctxFind = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Find(andThen = res))
          mgoQuery[Seq[R]](ctxFind,converter)
        }
    
         def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
           var res = Seq[ResultOptions]()
           next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
           sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
           fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
           top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
           val ctxFind = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
             .setCommand(Find(filter = Some(filtr),andThen = res))
           mgoQuery[Seq[R]](ctxFind,converter)
        }
        def getOneDocument(filtr: Bson): DBOResult[Document] = {
              val ctxFind = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
             .setCommand(Find(filter = Some(filtr),firstOnly = true))
           mgoQuery[Document](ctxFind,converter)
        }
    
        def insert(doc: Document): DBOResult[Completed] = {
          val ctxInsert = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Insert(Seq(doc)))
          mgoUpdate[Completed](ctxInsert)
        }
    
        def delete(filter: Bson): DBOResult[DeleteResult] = {
          val ctxDelete = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Delete(filter))
          mgoUpdate[DeleteResult](ctxDelete)
        }
    
        def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
          val ctxUpdate = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Update(filter,update,None,!many))
          mgoUpdate[UpdateResult](ctxUpdate)
        }
    
        def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
           val ctxUpdate = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
             .setCommand(Replace(filter,row))
           mgoUpdate[UpdateResult](ctxUpdate)
        }
    
      }

    这部分上篇博文讨论过。最后是akka-http的核心部分:Route。MongoDB CRUD服务对外的api:

          (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {
            (filter,fields,sort,top,next) => {
            dbor = {
              filter match {
                case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)
                case None => repository.getAll(next,sort,fields,top)
              }
            }
            val futRows = dbor.value.value.runToFuture.map {
              eolr =>
                eolr match {
                  case Right(olr) => olr match {
                    case Some(lr) => lr
                    case None => Seq[M]()
                  }
                  case Left(_) => Seq[M]()
                }
            }
            complete(futureToJson(futRows))
           }
          } ~ post {
            entity(as[String]) { json =>
              val extractedEntity: M = fromJson[M](json)
              val doc: Document = extractedEntity.to
              val futmsg = repository.insert(doc).value.value.runToFuture.map {
                eoc =>
                  eoc match {
                    case Right(oc) => oc match {
                      case Some(c) => c.toString()
                      case None => "insert may not complete!"
                    }
                    case Left(err) => err.getMessage
                  }
              }
    
              complete(futmsg)
            }
          } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>
            val bson = Document(filter)
            if (set == None) {
              entity(as[String]) { json =>
                val extractedEntity: M = fromJson[M](json)
                val doc: Document = extractedEntity.to
                val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {
                  eoc =>
                    eoc match {
                      case Right(oc) => oc match {
                        case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                        case None => "update may not complete!"
                      }
                      case Left(err) => err.getMessage
                    }
                }
                complete(futureToJson(futmsg))
              }
            } else {
              set match {
                case Some(u) =>
                  val ubson = Document(u)
                  dbou = repository.update(bson, ubson, many.getOrElse(true))
                case None =>
                  dbou = Left(new IllegalArgumentException("missing set statement for update!"))
              }
              val futmsg = dbou.value.value.runToFuture.map {
                eoc =>
                  eoc match {
                    case Right(oc) => oc match {
                      case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                      case None => "update may not complete!"
                    }
                    case Left(err) => err.getMessage
                  }
              }
              complete(futureToJson(futmsg))
            }
          } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>
            val bson = Document(filter)
            val futmsg = repository.delete(bson).value.value.runToFuture.map {
              eoc =>
                eoc match {
                  case Right(oc) => oc match {
                    case Some(d) => s"${d.getDeletedCount} rows deleted."
                    case None => "delete may not complete!"
                  }
                  case Left(err) => err.getMessage
                }
            }
            complete(futureToJson(futmsg))
          }
        }

     与上篇最大的区别就是这次的Route支持MongoDB特性的query string,bson类型的参数。如:

    http://192.168.0.189:50081/private/crud/person
    http://192.168.0.189:50081/private/crud/person?filter={"userid":"c001"}
    http://192.168.0.189:50081/private/crud/person?sort={"userid":-1}
    http://192.168.0.189:50081/private/crud/person?filter={"userid":{$gt:"c000"}}&sort={"userid":-1}&top=3

    可惜的是bson表达式中有些字符是url禁止的,所以必须预先处理一下。可以用公网的UrlEncoder在线转换:

    https://www.url-encoder.com   {"userid":"c001"} -> %7B%22userid%22%3A%22c001%22%7D

    在程序里可以用软件工具:"com.github.tasubo" % "jurl-tools" % "0.6"  URLEncode.encode(xyz)

     val sort =
          """
            |{userid:-1}
          """.stripMargin
    
        val getAllRequest = HttpRequest(
          HttpMethods.GET,
          uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),
        ).addHeader(authentication)

    blob服务的api Route:

          pathPrefix("blob") {
            (get & path(Remaining)) { id =>
              val filtr = equal("id", id)
              val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {
                eodoc =>
                  eodoc match {
                    case Right(odoc) => odoc match {
                      case Some(doc) =>
                        if (doc == null) None
                        else mgoGetBlobOrNone(doc, "photo")
                      case None => None
                    }
                    case Left(_) => None
                  }
              }
              onComplete(futOptPic) {
                case Success(optBlob) => optBlob match {
                  case Some(blob) =>
                    withoutSizeLimit {
                      encodeResponseWith(Gzip) {
                        complete(
                          HttpEntity(
                            ContentTypes.`application/octet-stream`,
                            ByteArrayToSource(blob.getData))
                        )
                      }
                    }
                  case None => complete(StatusCodes.NotFound)
                }
                case Failure(err) => complete(err)
              }
            } ~
            (post &  parameter('id)) { id =>
              withoutSizeLimit {
                decodeRequest {
                  extractDataBytes { bytes =>
                    val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
                      hd ++ bs
                    }
                    onComplete(fut) {
                      case Success(b) =>
                        val doc = Document("id" -> id, "photo" -> b.toArray)
                        val futmsg = repository.insert(doc).value.value.runToFuture.map {
                          eoc =>
                            eoc match {
                              case Right(oc) => oc match {
                                case Some(c) => c.toString()
                                case None => "insert may not complete!"
                              }
                              case Left(err) => err.getMessage
                            }
                        }
                        complete(futmsg)
                      case Failure(err) => complete(err)
                    }
                  }
                }
              }
            }
          } 

    注意:MongoRoute[M]是个范类型。我希望对任何Model的Route只需要指定M即可,如:

      implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))
      implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)
    
    ...  
        pathPrefix("public") {
            (pathPrefix("crud")) {
              new MongoRoute[Person]("person")(personDao)
                .route ~
                new MongoRoute[Photo]("photo")(picDao)
                  .route
            }
          }

    是否省力多了?但是,回到原来问题:blob类型在整个移动过程中都不需要进行格式转换。所以id字段名称是指定的,这点在设计表结构时要注意。

    如何测试一个httpserver还是比较头痛的。用浏览器只能测试GET,其它POST,PUT,DELETE应该怎么测试?其实可以用curl:

    curl -i -X GET http://rest-api.io/items
    curl -i -X GET http://rest-api.io/items/5069b47aa892630aae059584
    curl -i -X DELETE http://rest-api.io/items/5069b47aa892630aae059584
    curl -i -X POST -H 'Content-Type: application/json' -d '{"name": "New item", "year": "2009"}' http://rest-api.io/items
    curl -i -X PUT -H 'Content-Type: application/json' -d '{"name": "Updated item", "year": "2010"}' http://rest-api.io/items/5069b47aa892630aae059584
     

    下面写两个客户端分别测试crud和blob:

    TestCrudClient.scala

    import akka.actor._
    import akka.http.scaladsl.model.headers._
    
    import scala.concurrent._
    import scala.concurrent.duration._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.marshalling._
    import akka.http.scaladsl.model._
    import akka.stream.ActorMaterializer
    import com.github.tasubo.jurl.URLEncode
    import com.datatech.rest.mongo.MongoModels.Person
    import de.heikoseeberger.akkahttpjson4s.Json4sSupport
    import org.json4s.jackson
    import com.datatech.sdp.mongo.engine.MGOClasses._
    
    trait JsonCodec extends Json4sSupport {
      import org.json4s.DefaultFormats
      import org.json4s.ext.JodaTimeSerializers
      implicit val serilizer = jackson.Serialization
      implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
    }
    object JsConverters extends JsonCodec
    
    object TestCrudClient {
    
      type UserInfo = Map[String,Any]
      def main(args: Array[String]): Unit = {
        import JsConverters._
    
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
        // needed for the future flatMap/onComplete in the end
        implicit val executionContext = system.dispatcher
    
        val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
        val authRequest = HttpRequest(
          HttpMethods.POST,
          uri = "http://192.168.0.189:50081/auth",
          headers = List(authorization)
        )
    
        val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)
    
        val respToken = for {
          resp <- futToken
          jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
        } yield jstr
    
        val jstr =  Await.result[String](respToken,2 seconds)
        println(jstr)
        scala.io.StdIn.readLine()
    
        val authentication = headers.Authorization(OAuth2BearerToken(jstr))
    
        val sort =
          """
            |{userid:-1}
          """.stripMargin
    
        val getAllRequest = HttpRequest(
          HttpMethods.GET,
          uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),
        ).addHeader(authentication)
        val futGetAll: Future[HttpResponse] = Http().singleRequest(getAllRequest)
        println(Await.result(futGetAll,2 seconds))
        scala.io.StdIn.readLine()
    
        var bf =
          """
            |{"userid":"c888"}
          """.stripMargin
    
        println(URLEncode.encode(bf))
    
        val delRequest = HttpRequest(
          HttpMethods.DELETE,
          uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)
        ).addHeader(authentication)
        val futDel: Future[HttpResponse] = Http().singleRequest(delRequest)
        println(Await.result(futDel,2 seconds))
        scala.io.StdIn.readLine()
    
         bf =
          """
            |{"userid":"c001"}
          """.stripMargin
    
        val getRequest = HttpRequest(
          HttpMethods.GET,
          uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf),
        ).addHeader(authentication)
        val futGet: Future[HttpResponse] = Http().singleRequest(getRequest)
        println(Await.result(futGet,2 seconds))
        scala.io.StdIn.readLine()
    
        val tiger = Person("c001","tiger chan",Some(56))
        val john = Person("c002", "johnny dep", Some(60))
        val peter = Person("c003", "pete brad", Some(58))
        val susan = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )
        val ns = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )
    
        val saveRequest = HttpRequest(
          HttpMethods.POST,
          uri = "http://192.168.0.189:50081/public/crud/person"
        ).addHeader(authentication)
        val futPost: Future[HttpResponse] =
          for {
            reqEntity <- Marshal(peter).to[RequestEntity]
            response <- Http().singleRequest(saveRequest.copy(entity=reqEntity))
          } yield response
    
        println(Await.result(futPost,2 seconds))
        scala.io.StdIn.readLine()
    
        var set =
          """
            | {$set:
            |   {
            |    name:"tiger the king",
            |    age:18
            |   }
            | }
          """.stripMargin
    
        val updateRequest = HttpRequest(
          HttpMethods.PUT,
          uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(
               bf)+"&set="+URLEncode.encode(set)+"&many=true"
        ).addHeader(authentication)
    
        val futUpdate: Future[HttpResponse] = Http().singleRequest(updateRequest)
        println(Await.result(futUpdate,2 seconds))
        scala.io.StdIn.readLine()
    
        val repRequest = HttpRequest(
          HttpMethods.PUT,
          uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)
        ).addHeader(authentication)
        val futReplace: Future[HttpResponse] =
          for {
            reqEntity <- Marshal(susan).to[RequestEntity]
            response <- Http().singleRequest(updateRequest.copy(entity=reqEntity))
          } yield response
    
        println(Await.result(futReplace,2 seconds))
        scala.io.StdIn.readLine()
    
        system.terminate()
    
      }
    
    }

    TestFileClient.scala

    import akka.stream._
    import java.nio.file._
    import java.io._
    import akka.http.scaladsl.model.headers._
    import scala.concurrent._
    import com.datatech.rest.mongo.FileStreaming._
    import scala.concurrent.duration._
    import akka.actor.ActorSystem
    import akka.http.scaladsl.marshalling.Marshal
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.Http
    import akka.stream.scaladsl.{FileIO, Source}
    import scala.util._
    
    case class FileUtil(implicit sys: ActorSystem) {
      import sys.dispatcher
      implicit val mat = ActorMaterializer()
      def createEntity(file: File): RequestEntity = {
        require(file.exists())
        val formData =
          Multipart.FormData(
            Source.single(
              Multipart.FormData.BodyPart(
                "test",
                HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
                Map("filename" -> file.getName))))
        Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
      }
    
      def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
        implicit val mat = ActorMaterializer()
        import sys.dispatcher
        val futResp = Http(sys).singleRequest(
          //   Gzip.encodeMessage(
          request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
          //   )
        )
        futResp
          .andThen {
            case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              entity.dataBytes.map(_.utf8String).runForeach(println)
            case Success(r@HttpResponse(code, _, _, _)) =>
              println(s"Upload request failed, response code: $code")
              r.discardEntityBytes()
            case Success(_) => println("Unable to Upload file!")
            case Failure(err) => println(s"Upload failed: ${err.getMessage}")
          }
      }
    
      def downloadFileTo(request: HttpRequest, destPath: String) = {
        //  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
        val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
        futResp
          .andThen {
            case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
                .onComplete { case _ => println(s"Download file saved to: $destPath") }
            case Success(r@HttpResponse(code, _, _, _)) =>
              println(s"Download request failed, response code: $code")
              r.discardEntityBytes()
            case Success(_) => println("Unable to download file!")
            case Failure(err) => println(s"Download failed: ${err.getMessage}")
          }
    
      }
    
    }
    
    object TestFileClient  {
      type UserInfo = Map[String,Any]
      def main(args: Array[String]): Unit = {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
        // needed for the future flatMap/onComplete in the end
        implicit val executionContext = system.dispatcher
    
        val helloRequest = HttpRequest(uri = "http://192.168.0.189:50081/")
    
        val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
        val authRequest = HttpRequest(
          HttpMethods.POST,
          uri = "http://192.168.0.189:50081/auth",
          headers = List(authorization)
        )
        
        val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)
    
        val respToken = for {
          resp <- futToken
          jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
        } yield jstr
    
        val jstr =  Await.result[String](respToken,2 seconds)
        println(jstr)
    
        scala.io.StdIn.readLine()
    
        val authentication = headers.Authorization(OAuth2BearerToken(jstr))
    
        val entity = HttpEntity(
          ContentTypes.`application/octet-stream`,
          fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)
        )
        //
        val chunked = HttpEntity.Chunked.fromData(
          ContentTypes.`application/octet-stream`,
          fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)
        )
    
        val uploadRequest = HttpRequest(
          HttpMethods.POST,
    //      uri = "http://192.168.0.189:50081/private/file?filename=tiger.jpg",
          uri = "http://192.168.0.189:50081/public/crud/photo/blob?id=tiger.jpg",
        ).addHeader(authentication)
    
        //upload file
        Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
        //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
    
    
        val dlRequest = HttpRequest(
          HttpMethods.GET,
    //      uri = "http://192.168.0.189:50081/api/file/mypic.jpg",
          uri = "http://192.168.0.189:50081/public/crud/photo/blob/tiger.jpg",
        ).addHeader(authentication)
    
        FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")
    
        scala.io.StdIn.readLine()
        system.terminate()
      }
    
    }

    下面是本次示范中的源代码:

    build.sbt

    name := "rest-mongo"
    
    version := "0.1"
    
    scalaVersion := "2.12.8"
    
    scalacOptions += "-Ypartial-unification"
    val akkaVersion = "2.5.23"
    val akkaHttpVersion = "10.1.8"
    
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-http"   % "10.1.8",
      "com.typesafe.akka" %% "akka-stream" % "2.5.23",
      "com.pauldijou" %% "jwt-core" % "3.0.1",
      "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
      "org.json4s" %% "json4s-native" % "3.6.1",
      "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
      "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
      "org.slf4j" % "slf4j-simple" % "1.7.25",
      "org.json4s" %% "json4s-jackson" % "3.6.7",
      "org.json4s" %% "json4s-ext" % "3.6.7",
    
      // for scalikejdbc
      "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
      "com.h2database"  %  "h2" % "1.4.199",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.3.2",
      //for cassandra 3.6.0
      "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
      "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",
      //for mongodb 4.0
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
      "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
      "io.monix" %% "monix" % "3.0.0-RC3",
      "org.typelevel" %% "cats-core" % "2.0.0-M4",
      "com.github.tasubo" % "jurl-tools" % "0.6"
    )

    MongoHttpServer.scala

    package com.datatech.rest.mongo
    
    import akka.actor._
    import akka.stream._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.server.Directives._
    import pdi.jwt._
    import AuthBase._
    import MockUserAuthService._
    import org.mongodb.scala._
    
    import scala.collection.JavaConverters._
    import MongoModels._
    import MongoRepo._
    import MongoRoute._
    
    
    object MongoHttpServer extends App {
    
    
      implicit val httpSys = ActorSystem("httpSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEC = httpSys.dispatcher
    
      val settings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
        .build()
      implicit val client: MongoClient = MongoClient(settings)
      implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))
      implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)
    
      implicit val authenticator = new AuthBase()
        .withAlgorithm(JwtAlgorithm.HS256)
        .withSecretKey("OpenSesame")
        .withUserFunc(getValidUser)
    
      val route =
        path("auth") {
          authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
            post { complete(authenticator.issueJwt(userinfo))}
          }
        } ~
          pathPrefix("private") {
            authenticateOAuth2(realm = "private", authenticator.authenticateToken) { validToken =>
              FileRoute(validToken)
                .route
              // ~ ...
            }
          } ~
          pathPrefix("public") {
            (pathPrefix("crud")) {
              new MongoRoute[Person]("person")(personDao)
                .route ~
                new MongoRoute[Photo]("photo")(picDao)
                  .route
            }
          }
    
      val (port, host) = (50081,"192.168.0.189")
    
      val bindingFuture = Http().bindAndHandle(route,host,port)
    
      println(s"Server running at $host $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
    
      bindingFuture.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
    
    
    }

    ModalBase.scala

    package com.datatech.rest.mongo
    
    trait ModelBase[E] {
      def to: E
    }

    MongoModel.scala

    package com.datatech.rest.mongo
    import org.mongodb.scala._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    
    object MongoModels {
    
      case class Person(
                       userid: String = "",
                       name: String = "",
                       age: Option[Int] = None,
                       dob: Option[MGODate] = None,
                       address: Option[String] = None
                       ) extends ModelBase[Document] {
        import org.mongodb.scala.bson._
    
        override def to: Document = {
          var doc = Document(
          "userid" -> this.userid,
          "name" -> this.name)
    
    
          if (this.age != None)
            doc = doc + ("age" -> this.age.get)
    
          if (this.dob != None)
            doc = doc + ("dob" -> this.dob.get)
    
          if (this.address != None)
            doc = doc + ("address" -> this.address.getOrElse(""))
    
          doc
        }
    
      }
      object Person {
        val fromDocument: Document => Person = doc => {
          val keyset = doc.keySet
          Person(
            userid = doc.getString("userid"),
            name = doc.getString("name"),
            age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],
    
            dob =  {if (keyset.contains("dob"))
              Some(doc.getDate("dob"))
            else None },
    
            address =  mgoGetStringOrNone(doc,"address")
          )
        }
      }
    
      case class Photo (
                         id: String,
                         photo: Option[MGOBlob]
                       ) extends ModelBase[Document] {
        override def to: Document = {
          var doc = Document("id" -> this.id)
          if (photo != None)
            doc = doc + ("photo" -> this.photo)
          doc
        }
      }
    
      object Photo {
        def fromDocument: Document => Photo = doc => {
          val keyset = doc.keySet
          Photo(
            id = doc.getString("id"),
            photo = mgoGetBlobOrNone(doc, "photo")
          )
        }
      }
    
    
    }

    MongoRepo.scala

    package com.datatech.rest.mongo
    import org.mongodb.scala._
    import org.bson.conversions.Bson
    import org.mongodb.scala.result._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    import MGOEngine._
    import MGOCommands._
    import com.datatech.sdp.result.DBOResult.DBOResult
    import MongoModels._
    
    object MongoRepo {
    
       class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {
        def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
          var res = Seq[ResultOptions]()
          next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
          sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
          fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
          top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
    
          val ctxFind = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Find(andThen = res))
          mgoQuery[Seq[R]](ctxFind,converter)
        }
    
         def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
           var res = Seq[ResultOptions]()
           next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
           sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
           fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
           top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
           val ctxFind = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
             .setCommand(Find(filter = Some(filtr),andThen = res))
           mgoQuery[Seq[R]](ctxFind,converter)
        }
        def getOneDocument(filtr: Bson): DBOResult[Document] = {
              val ctxFind = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
             .setCommand(Find(filter = Some(filtr),firstOnly = true))
           mgoQuery[Document](ctxFind,converter)
        }
    
        def insert(doc: Document): DBOResult[Completed] = {
          val ctxInsert = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Insert(Seq(doc)))
          mgoUpdate[Completed](ctxInsert)
        }
    
        def delete(filter: Bson): DBOResult[DeleteResult] = {
          val ctxDelete = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Delete(filter))
          mgoUpdate[DeleteResult](ctxDelete)
        }
    
        def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
          val ctxUpdate = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Update(filter,update,None,!many))
          mgoUpdate[UpdateResult](ctxUpdate)
        }
    
        def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
           val ctxUpdate = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
             .setCommand(Replace(filter,row))
           mgoUpdate[UpdateResult](ctxUpdate)
        }
    
      }
    
    }

    MongoRoute.scala

    package com.datatech.rest.mongo
    import akka.http.scaladsl.server.Directives
    
    import scala.util._
    import org.mongodb.scala._
    import com.datatech.sdp.file.Streaming._
    import org.mongodb.scala.result._
    import MongoRepo._
    import akka.stream.ActorMaterializer
    import com.datatech.sdp.result.DBOResult._
    import org.mongodb.scala.model.Filters._
    import com.datatech.sdp.mongo.engine.MGOClasses._
    import monix.execution.CancelableFuture
    import akka.util._
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.coding.Gzip
    object MongoRoute {
      class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(
        implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter {
        import monix.execution.Scheduler.Implicits.global
        var dbor: DBOResult[Seq[M]] = _
        var dbou: DBOResult[UpdateResult] = _
        val route = pathPrefix(pathName) {
          pathPrefix("blob") {
            (get & path(Remaining)) { id =>
              val filtr = equal("id", id)
              val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {
                eodoc =>
                  eodoc match {
                    case Right(odoc) => odoc match {
                      case Some(doc) =>
                        if (doc == null) None
                        else mgoGetBlobOrNone(doc, "photo")
                      case None => None
                    }
                    case Left(_) => None
                  }
              }
              onComplete(futOptPic) {
                case Success(optBlob) => optBlob match {
                  case Some(blob) =>
                    withoutSizeLimit {
                      encodeResponseWith(Gzip) {
                        complete(
                          HttpEntity(
                            ContentTypes.`application/octet-stream`,
                            ByteArrayToSource(blob.getData))
                        )
                      }
                    }
                  case None => complete(StatusCodes.NotFound)
                }
                case Failure(err) => complete(err)
              }
            } ~
            (post &  parameter('id)) { id =>
              withoutSizeLimit {
                decodeRequest {
                  extractDataBytes { bytes =>
                    val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
                      hd ++ bs
                    }
                    onComplete(fut) {
                      case Success(b) =>
                        val doc = Document("id" -> id, "photo" -> b.toArray)
                        val futmsg = repository.insert(doc).value.value.runToFuture.map {
                          eoc =>
                            eoc match {
                              case Right(oc) => oc match {
                                case Some(c) => c.toString()
                                case None => "insert may not complete!"
                              }
                              case Left(err) => err.getMessage
                            }
                        }
                        complete(futmsg)
                      case Failure(err) => complete(err)
                    }
                  }
                }
              }
            }
          } ~
          (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {
            (filter,fields,sort,top,next) => {
            dbor = {
              filter match {
                case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)
                case None => repository.getAll(next,sort,fields,top)
              }
            }
            val futRows = dbor.value.value.runToFuture.map {
              eolr =>
                eolr match {
                  case Right(olr) => olr match {
                    case Some(lr) => lr
                    case None => Seq[M]()
                  }
                  case Left(_) => Seq[M]()
                }
            }
            complete(futureToJson(futRows))
           }
          } ~ post {
            entity(as[String]) { json =>
              val extractedEntity: M = fromJson[M](json)
              val doc: Document = extractedEntity.to
              val futmsg = repository.insert(doc).value.value.runToFuture.map {
                eoc =>
                  eoc match {
                    case Right(oc) => oc match {
                      case Some(c) => c.toString()
                      case None => "insert may not complete!"
                    }
                    case Left(err) => err.getMessage
                  }
              }
    
              complete(futmsg)
            }
          } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>
            val bson = Document(filter)
            if (set == None) {
              entity(as[String]) { json =>
                val extractedEntity: M = fromJson[M](json)
                val doc: Document = extractedEntity.to
                val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {
                  eoc =>
                    eoc match {
                      case Right(oc) => oc match {
                        case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                        case None => "update may not complete!"
                      }
                      case Left(err) => err.getMessage
                    }
                }
                complete(futureToJson(futmsg))
              }
            } else {
              set match {
                case Some(u) =>
                  val ubson = Document(u)
                  dbou = repository.update(bson, ubson, many.getOrElse(true))
                case None =>
                  dbou = Left(new IllegalArgumentException("missing set statement for update!"))
              }
              val futmsg = dbou.value.value.runToFuture.map {
                eoc =>
                  eoc match {
                    case Right(oc) => oc match {
                      case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                      case None => "update may not complete!"
                    }
                    case Left(err) => err.getMessage
                  }
              }
              complete(futureToJson(futmsg))
            }
          } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>
            val bson = Document(filter)
            val futmsg = repository.delete(bson).value.value.runToFuture.map {
              eoc =>
                eoc match {
                  case Right(oc) => oc match {
                    case Some(d) => s"${d.getDeletedCount} rows deleted."
                    case None => "delete may not complete!"
                  }
                  case Left(err) => err.getMessage
                }
            }
            complete(futureToJson(futmsg))
          }
        }
      }
    
    }
  • 相关阅读:
    [leetcode]_Search Insert Position
    [leetcode]_Merge Two Sorted Lists
    [leetcode]_Valid Parentheses
    喧闹中坚守底线-徘徊的行走在不知道路在何方的大地上。
    [leetcode]_Longest Common Prefix
    [leetcode]_Remove Nth Node From End of List
    [leetcode]_Roman to Integer
    [leetcode]_Palindrome Number
    策略模式(Strategy)
    面向对象
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/11325996.html
Copyright © 2011-2022 走看看