zoukankan      html  css  js  c++  java
  • restapi(5)- rest-mongo 应用实例:分布式图片管理系统之一,rest 服务

      最近有同事提起想把网页上的图片存在MongoDB里,我十分赞同。比起把图片以文件形式存放在硬盘子目录的方式,MongoDB有太多的优势。首先,MongoDB是分布式数据库,图片可以跨服务器存储。在一个集群环境里通过复制集、分片等技术可以提高图片读取速度、实现数据的高可用和安全性。再就是对大量的图片可用规范的记录管理方式来进行处理,甚至在一个大流量环境里还可以用集群节点负载平衡方式来助力图片的存取。

    我想了想看有没有办法让这个图片管理系统尽用分布式集群软件的能力。MongoDB是一个分布式数据库,在一个集群内任何节点都可以存取,也就是说在集群所有节点上都部署统一的rest-mongo,这样客户端可以用不同的ip地址来访问不同的节点提交图片存取请求。假设某一个节点接待比别的节点更多的客户端,那么我们可以把图片存取的过程放到其它比较空闲的节点上去运行,即所谓负载均衡了。看来这个系统需要MongoDB,rest-mongo和akka-cluster这几个组件。

    我们先从前端需求开始:页面上每个商品有n个图片,客户端提出存入系统请求时提供商品编号、描述、默认尺寸及图片。对一个商品提出n个存写请求,同一个商品编号,系统对每张图片自动产生序号并在httprespose中返回给客户端。客户端取图片时提供商品编号,系统先把这个商品的所有图片序号返还客户端,客户端再按序号一张一张索取图片,并指定输出图片的伸缩尺寸。

    这篇我们先跟着前几篇的内容把有关图片存取的rest服务实现了。在上篇rest-mongo的基础上,针对新的系统需求做一些针对性的修改应该就行了。

    首先是Model部分,如下:

    case class WebPic(
                         pid: String,
                         seqno: Int,
                         desc: Option[String],
                          Option[Int],
                         heigth: Option[Int],
                         pic: Option[MGOBlob]
                       ) extends ModelBase[Document] { self =>
        override def to: Document = {
          var doc = Document(
            "pid" -> self.pid,
            "seqno" -> self.seqno
          )
          if (self.desc != None)
            doc = doc + ("desc" -> self.desc.get)
          if (self.width != None)
            doc = doc + ("width" -> self.width.get)
          if (self.heigth != None)
            doc = doc + ("heigth" -> self.heigth.get)
          if (self.pic != None)
            doc = doc + ("photo" -> self.pic.get)
          doc
        }
      }
      object WebPic {
        def fromDocument: Document => WebPic = doc => {
          WebPic(
            pid = doc.getString("pid"),
            seqno = doc.getInteger("seqno"),
            desc = mgoGetStringOrNone(doc,"desc"),
            width = mgoGetIntOrNone(doc,"width").asInstanceOf[Option[Int]],
            heigth = mgoGetIntOrNone(doc,"heigth").asInstanceOf[Option[Int]],
            pic = None
          )
        }
      }

    width,height字段是客户端提供的默认宽高尺寸。如果客户在请求图片时没有提供就用数据库里客户端在提交存储时提供的默认宽高。

    在repo里还要增加一个count功能,提供一个pid, 返回在该pid名下存写的图片数量:

       import org.mongodb.scala.model.Filters._
        def count(pid: String):DBOResult[Int] = {
          val ctxCount = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Count(filter=Some(equal("pid",pid))))
          mgoQuery[Int](ctxCount,None)
        }

    返回类型是DBResult[Int]。还要加一个读取第一条WebPic记录的功能:

        def getOnePicture(pid: String, seqno: Int): DBOResult[R] = {
          val ctxFind = MGOContext(dbName = db, collName = coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Find(filter = Some(and(equal("pid",pid),equal("seqno",seqno))), firstOnly = true))
          mgoQuery[R](ctxFind, converter)
        }

    注意这个函数返回的是DBOResult[R]类型。这是因为我们需要把整条记录读出来,特别是width,height字段,方便在用户没有指定宽高时提供默认值。因为涉及到具体的字段名称,所以要在读出Document时做一个WebPic转换

        def getOneDocument(filtr: Bson): DBOResult[Document] = {
              val ctxFind = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
             .setCommand(Find(filter = Some(filtr),firstOnly = true))
           mgoQuery[Document](ctxFind,None)
        }
        def getOnePicture(pid: String, seqno: Int): DBOResult[R] = {
          val ctxFind = MGOContext(dbName = db, collName = coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Find(filter = Some(and(equal("pid",pid),equal("seqno",seqno))), firstOnly = true))
          mgoQuery[R](ctxFind, converter)
        }

    要用getOnPicture, getOnDocument是通用的。在编译时无法识别width,height。

    好了,下面是Route部分的修改。先从用户提交图片存储请求开始,用户可能用下面这样格式的url来请求:

    (post &  parameters('pid,'desc.?,'width.as[Int].?,'heigth.as[Int].?)) 如:

    http://example.com:50081/public/gms/pictures?pid=apple&width=128  图片放在HttpRequest的Entity里面。

    我们需要先获取apple的数量seqno、把信息存入数据库然后返回这个seqno:

         pathPrefix("pictures") {
            (post &  parameters('pid,'desc.?,'width.as[Int].?,'heigth.as[Int].?)) { (pid, optDesc, optWid, optHgh) =>
              val futCount: Future[Int] = repository.count(pid).value.value.runToFuture.map {
                eoi =>
                  eoi match {
                    case Right(oi) => oi match {
                      case Some(i) => i
                      case None => -1
                    }
                    case Left(err) => -1
                  }
              }
              val count: Int = Await.result(futCount, 2 seconds)
              var doc = Document(
                "pid" -> pid,
                "seqno" -> count
              )
              if (optDesc != None)
                doc = doc + ("desc" -> optDesc.get)
              if (optWid != None)
                doc = doc + ("desc" -> optWid.get)
              if (optHgh != None)
                doc = doc + ("desc" -> optHgh.get)
    
              withoutSizeLimit {
                decodeRequest {
                  extractDataBytes { bytes =>
                    val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
                      hd ++ bs
                    }
                    onComplete(fut) {
                      case Success(b) =>
                        doc = doc + ("pic" -> b.toArray)
                        val futmsg: Future[String] = repository.insert(doc).value.value.runToFuture.map {
                          eoc =>
                            eoc match {
                              case Right(oc) => oc match {
                                case Some(c) => count.toString //   c.toString()
                                case None => "insert may not complete!"
                              }
                              case Left(err) => err.getMessage
                            }
                        }
                        complete(futmsg)
                      case Failure(err) => complete(err)
                    }
                  }
                }
              }

    注意,在Response里的返回结果一定是ByteString类型的。

    图片读取请求分两步:先提供pid获取一个不含图片的记录清单(注意Model里WebPic的fromDocument函数里pic=None),返还用户,如:http://example.com:50081/public/pms/pictures?pid=apple

    用户得到清单里的seqno后组装成完整url:http://example.com:50081/public/pms/pictures?pid=apple&seqno=2&height=64

    系统读取图片并按用户关于宽高要求或数据库里默认宽高数据输出图片:

            (get & parameters('pid, 'seqno.as[Int].?,'width.as[Int].?,'height.as[Int].?)) {
              (pid, optSeq, optWid,optHght) =>
                if (optSeq == None) {
                  dbor = repository.query(equal("pid", pid))
                  val futRows = dbor.value.value.runToFuture.map {
                    eolr =>
                      eolr match {
                        case Right(olr) => olr match {
                          case Some(lr) => lr
                          case None => Seq[M]()
                        }
                        case Left(_) => Seq[M]()
                      }
                  }
                  complete(futureToJson(futRows))
                } else {
                  val futOptPicRow: CancelableFuture[Option[WebPic]] = repository.getOnePicture(pid,optSeq.get)
                    .value.value.runToFuture.map {
                    eorow =>
                      eorow match {
                        case Right(orow) => orow match {
                          case Some(row) =>
                            if (row == null) None
                            else Some(row.asInstanceOf[WebPic])
                          case None => None
                        }
                        case Left(_) => None
                      }
                  }
                  onComplete(futOptPicRow) {
                    case Success(optRow) => optRow match {
                      case Some(row) =>
                        val width  = if(optWid == None) row.width.getOrElse(128) else optWid.getOrElse(128)
                        val height = if(optHght == None) row.heigth.getOrElse(128) else optHght.getOrElse(128)
                        if (row.pic != None) {
    
                          withoutSizeLimit {
                            encodeResponseWith(Gzip) {
                              complete(
                                HttpEntity(
                                  ContentTypes.`application/octet-stream`,
                                  ByteArrayToSource(Imaging.setImageSize(row.pic.get.getData, width, height)
                                  ))
                              )
                            }
                          }
                        } else complete(StatusCodes.NotFound)
                      case None => complete(StatusCodes.NotFound)
                    }
                    case Failure(err) => complete(err)
                  }
                }
              }

    最后我们把这个Route组装在main route里:

      implicit val webPicDao = new MongoRepo[WebPic]("testdb","pms", WebPic.fromDocument)
    ...
    
          pathPrefix("public") {
            (pathPrefix("crud")) {
              new MongoRoute[Person]("person")(personDao)
                .route ~
                new MongoRoute[Photo]("photo")(picDao)
                  .route
            } ~
            new MongoRoute[WebPic]("pms")(webPicDao).route
          }

    下面是本次示范的源代码:

    MongoModel.scala

    package com.datatech.rest.mongo
    import org.mongodb.scala._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    
    object MongoModels {
    
      case class Person(
                       userid: String = "",
                       name: String = "",
                       age: Option[Int] = None,
                       dob: Option[MGODate] = None,
                       address: Option[String] = None
                       ) extends ModelBase[Document] {
        import org.mongodb.scala.bson._
    
        override def to: Document = {
          var doc = Document(
          "userid" -> this.userid,
          "name" -> this.name)
    
    
          if (this.age != None)
            doc = doc + ("age" -> this.age.get)
    
          if (this.dob != None)
            doc = doc + ("dob" -> this.dob.get)
    
          if (this.address != None)
            doc = doc + ("address" -> this.address.getOrElse(""))
    
          doc
        }
    
      }
      object Person {
        val fromDocument: Document => Person = doc => {
          val keyset = doc.keySet
          Person(
            userid = doc.getString("userid"),
            name = doc.getString("name"),
            age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],
    
            dob =  {if (keyset.contains("dob"))
              Some(doc.getDate("dob"))
            else None },
    
            address =  mgoGetStringOrNone(doc,"address")
          )
        }
      }
    
      case class Photo (
                         id: String,
                         loc: Option[String],
                         size: Option[Int],
                         credt: Option[MGODate],
                         photo: Option[MGOBlob]
                       ) extends ModelBase[Document] {
        override def to: Document = {
          var doc = Document("id" -> this.id)
          if (loc != None)
            doc = doc + ("loc" -> this.loc.get)
          if (size != None)
            doc = doc + ("size" -> this.size.get)
          if (credt != None)
            doc = doc + ("credt" -> this.credt.get)
          if (photo != None)
            doc = doc + ("photo" -> this.photo.get)
          doc
        }
      }
    
      object Photo {
        def fromDocument: Document => Photo = doc => {
          Photo(
            id = doc.getString("id"),
            loc = mgoGetStringOrNone(doc,"loc"),
            size = mgoGetIntOrNone(doc,"size").asInstanceOf[Option[Int]],
            credt = mgoGetDateOrNone(doc,"credt"),
            photo = mgoGetBlobOrNone(doc, "photo")
          )
        }
      }
    
      case class WebPic(
                         pid: String,
                         seqno: Int,
                         desc: Option[String],
                          Option[Int],
                         heigth: Option[Int],
                         pic: Option[MGOBlob]
                       ) extends ModelBase[Document] { self =>
        override def to: Document = {
          var doc = Document(
            "pid" -> self.pid,
            "seqno" -> self.seqno
          )
          if (self.desc != None)
            doc = doc + ("desc" -> self.desc.get)
          if (self.width != None)
            doc = doc + ("width" -> self.width.get)
          if (self.heigth != None)
            doc = doc + ("heigth" -> self.heigth.get)
          if (self.pic != None)
            doc = doc + ("photo" -> self.pic.get)
          doc
        }
      }
      object WebPic {
        def fromDocument: Document => WebPic = doc => {
          WebPic(
            pid = doc.getString("pid"),
            seqno = doc.getInteger("seqno"),
            desc = mgoGetStringOrNone(doc,"desc"),
            width = mgoGetIntOrNone(doc,"width").asInstanceOf[Option[Int]],
            heigth = mgoGetIntOrNone(doc,"heigth").asInstanceOf[Option[Int]],
            pic = None
          )
        }
      }
    }

    MongoRepo.scala

    package com.datatech.rest.mongo
    import org.mongodb.scala._
    import org.bson.conversions.Bson
    import org.mongodb.scala.result._
    import com.datatech.sdp.mongo.engine._
    import MGOClasses._
    import MGOEngine._
    import MGOCommands._
    import com.datatech.sdp.result.DBOResult.DBOResult
    
    object MongoRepo {
    
       class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {
        def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
          var res = Seq[ResultOptions]()
          next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
          sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
          fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
          top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
    
          val ctxFind = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Find(andThen = res))
          mgoQuery[Seq[R]](ctxFind,converter)
        }
    
         def query(filtr: Bson, next:Option[String]=None,sort:Option[String]=None,fields:Option[String]=None,top:Option[Int]=None): DBOResult[Seq[R]] = {
           var res = Seq[ResultOptions]()
           next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
           sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
           fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
           top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
           val ctxFind = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
             .setCommand(Find(filter = Some(filtr),andThen = res))
           mgoQuery[Seq[R]](ctxFind,converter)
        }
    
       import org.mongodb.scala.model.Filters._
        def count(pid: String):DBOResult[Int] = {
          val ctxCount = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Count(filter=Some(equal("pid",pid))))
          mgoQuery[Int](ctxCount,None)
        }
    
        def getOneDocument(filtr: Bson): DBOResult[Document] = {
              val ctxFind = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
             .setCommand(Find(filter = Some(filtr),firstOnly = true))
           mgoQuery[Document](ctxFind,None)
        }
        def getOnePicture(pid: String, seqno: Int): DBOResult[R] = {
          val ctxFind = MGOContext(dbName = db, collName = coll)
            .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
            .setCommand(Find(filter = Some(and(equal("pid",pid),equal("seqno",seqno))), firstOnly = true))
          mgoQuery[R](ctxFind, converter)
        }
        def insert(doc: Document): DBOResult[Completed] = {
          val ctxInsert = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Insert(Seq(doc)))
          mgoUpdate[Completed](ctxInsert)
        }
    
        def delete(filter: Bson): DBOResult[DeleteResult] = {
          val ctxDelete = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Delete(filter))
          mgoUpdate[DeleteResult](ctxDelete)
        }
    
        def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
          val ctxUpdate = MGOContext(dbName = db,collName=coll)
            .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
            .setCommand(Update(filter,update,None,!many))
          mgoUpdate[UpdateResult](ctxUpdate)
        }
    
        def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
           val ctxUpdate = MGOContext(dbName = db,collName=coll)
             .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
             .setCommand(Replace(filter,row))
           mgoUpdate[UpdateResult](ctxUpdate)
        }
    
      }
    
    }

    MongoRoute.scala

    package com.datatech.rest.mongo
    import akka.http.scaladsl.server.Directives
    import com.datatech.sdp.file._
    
    import scala.util._
    import org.mongodb.scala._
    import com.datatech.sdp.file.Streaming._
    import org.mongodb.scala.result._
    import MongoRepo._
    import akka.stream.ActorMaterializer
    import com.datatech.sdp.result.DBOResult._
    import org.mongodb.scala.model.Filters._
    import com.datatech.sdp.mongo.engine.MGOClasses._
    import monix.execution.CancelableFuture
    import akka.util._
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.coding.Gzip
    import com.datatech.rest.mongo.MongoModels.WebPic
    
    import scala.concurrent._
    import scala.concurrent.duration._
    object MongoRoute {
      class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(
        implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter {
        import monix.execution.Scheduler.Implicits.global
        var dbor: DBOResult[Seq[M]] = _
        var dbou: DBOResult[UpdateResult] = _
        val route = pathPrefix(pathName) {
          pathPrefix("pictures") {
            (post &  parameters('pid,'desc.?,'width.as[Int].?,'heigth.as[Int].?)) { (pid, optDesc, optWid, optHgh) =>
              val futCount: Future[Int] = repository.count(pid).value.value.runToFuture.map {
                eoi =>
                  eoi match {
                    case Right(oi) => oi match {
                      case Some(i) => i
                      case None => -1
                    }
                    case Left(err) => -1
                  }
              }
              val count: Int = Await.result(futCount, 2 seconds)
              var doc = Document(
                "pid" -> pid,
                "seqno" -> count
              )
              if (optDesc != None)
                doc = doc + ("desc" -> optDesc.get)
              if (optWid != None)
                doc = doc + ("desc" -> optWid.get)
              if (optHgh != None)
                doc = doc + ("desc" -> optHgh.get)
    
              withoutSizeLimit {
                decodeRequest {
                  extractDataBytes { bytes =>
                    val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
                      hd ++ bs
                    }
                    onComplete(fut) {
                      case Success(b) =>
                        doc = doc + ("pic" -> b.toArray)
                        val futmsg: Future[String] = repository.insert(doc).value.value.runToFuture.map {
                          eoc =>
                            eoc match {
                              case Right(oc) => oc match {
                                case Some(c) => count.toString //   c.toString()
                                case None => "insert may not complete!"
                              }
                              case Left(err) => err.getMessage
                            }
                        }
                        complete(futmsg)
                      case Failure(err) => complete(err)
                    }
                  }
                }
              }
            } ~
            (get & parameters('pid, 'seqno.as[Int].?,'width.as[Int].?,'height.as[Int].?)) {
              (pid, optSeq, optWid,optHght) =>
                if (optSeq == None) {
                  dbor = repository.query(equal("pid", pid))
                  val futRows = dbor.value.value.runToFuture.map {
                    eolr =>
                      eolr match {
                        case Right(olr) => olr match {
                          case Some(lr) => lr
                          case None => Seq[M]()
                        }
                        case Left(_) => Seq[M]()
                      }
                  }
                  complete(futureToJson(futRows))
                } else {
                  val futOptPicRow: CancelableFuture[Option[WebPic]] = repository.getOnePicture(pid,optSeq.get)
                    .value.value.runToFuture.map {
                    eorow =>
                      eorow match {
                        case Right(orow) => orow match {
                          case Some(row) =>
                            if (row == null) None
                            else Some(row.asInstanceOf[WebPic])
                          case None => None
                        }
                        case Left(_) => None
                      }
                  }
                  onComplete(futOptPicRow) {
                    case Success(optRow) => optRow match {
                      case Some(row) =>
                        val width  = if(optWid == None) row.width.getOrElse(128) else optWid.getOrElse(128)
                        val height = if(optHght == None) row.heigth.getOrElse(128) else optHght.getOrElse(128)
                        if (row.pic != None) {
    
                          withoutSizeLimit {
                            encodeResponseWith(Gzip) {
                              complete(
                                HttpEntity(
                                  ContentTypes.`application/octet-stream`,
                                  ByteArrayToSource(Imaging.setImageSize(row.pic.get.getData, width, height)
                                  ))
                              )
                            }
                          }
                        } else complete(StatusCodes.NotFound)
                      case None => complete(StatusCodes.NotFound)
                    }
                    case Failure(err) => complete(err)
                  }
                }
              }
          } ~
          pathPrefix("blob") {
            (get & parameter('filter)) { filter =>
              val filtr = Document(filter)
              val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {
                eodoc =>
                  eodoc match {
                    case Right(odoc) => odoc match {
                      case Some(doc) =>
                        if (doc == null) None
                        else mgoGetBlobOrNone(doc, "photo")
                      case None => None
                    }
                    case Left(_) => None
                  }
              }
              onComplete(futOptPic) {
                case Success(optBlob) => optBlob match {
                  case Some(blob) =>
                    withoutSizeLimit {
                      encodeResponseWith(Gzip) {
                        complete(
                          HttpEntity(
                            ContentTypes.`application/octet-stream`,
                            ByteArrayToSource(blob.getData)
                          )
                        )
                      }
                    }
                  case None => complete(StatusCodes.NotFound)
                }
                case Failure(err) => complete(err)
              }
            } ~
            (post &  parameter('bson)) { bson =>
              val bdoc = Document(bson)
              withoutSizeLimit {
                decodeRequest {
                  extractDataBytes { bytes =>
                    val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
                      hd ++ bs
                    }
                    onComplete(fut) {
                      case Success(b) =>
                        val doc = bdoc + ("photo" -> b.toArray)
                        val futmsg = repository.insert(doc).value.value.runToFuture.map {
                          eoc =>
                            eoc match {
                              case Right(oc) => oc match {
                                case Some(c) => c.toString()
                                case None => "insert may not complete!"
                              }
                              case Left(err) => err.getMessage
                            }
                        }
                        complete(futmsg)
                      case Failure(err) => complete(err)
                    }
                  }
                }
              }
            }
          } ~
          (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {
            (filter,fields,sort,top,next) => {
            dbor = {
              filter match {
                case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)
                case None => repository.getAll(next,sort,fields,top)
              }
            }
            val futRows = dbor.value.value.runToFuture.map {
              eolr =>
                eolr match {
                  case Right(olr) => olr match {
                    case Some(lr) => lr
                    case None => Seq[M]()
                  }
                  case Left(_) => Seq[M]()
                }
            }
            complete(futureToJson(futRows))
           }
          } ~ post {
            entity(as[String]) { json =>
              val extractedEntity: M = fromJson[M](json)
              val doc: Document = extractedEntity.to
              val futmsg = repository.insert(doc).value.value.runToFuture.map {
                eoc =>
                  eoc match {
                    case Right(oc) => oc match {
                      case Some(c) => c.toString()
                      case None => "insert may not complete!"
                    }
                    case Left(err) => err.getMessage
                  }
              }
    
              complete(futmsg)
            }
          } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>
            val bson = Document(filter)
            if (set == None) {
              entity(as[String]) { json =>
                val extractedEntity: M = fromJson[M](json)
                val doc: Document = extractedEntity.to
                val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {
                  eoc =>
                    eoc match {
                      case Right(oc) => oc match {
                        case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                        case None => "update may not complete!"
                      }
                      case Left(err) => err.getMessage
                    }
                }
                complete(futureToJson(futmsg))
              }
            } else {
              set match {
                case Some(u) =>
                  val ubson = Document(u)
                  dbou = repository.update(bson, ubson, many.getOrElse(true))
                case None =>
                  dbou = Left(new IllegalArgumentException("missing set statement for update!"))
              }
              val futmsg = dbou.value.value.runToFuture.map {
                eoc =>
                  eoc match {
                    case Right(oc) => oc match {
                      case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
                      case None => "update may not complete!"
                    }
                    case Left(err) => err.getMessage
                  }
              }
              complete(futureToJson(futmsg))
            }
          } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>
            val bson = Document(filter)
            val futmsg = repository.delete(bson).value.value.runToFuture.map {
              eoc =>
                eoc match {
                  case Right(oc) => oc match {
                    case Some(d) => s"${d.getDeletedCount} rows deleted."
                    case None => "delete may not complete!"
                  }
                  case Left(err) => err.getMessage
                }
            }
            complete(futureToJson(futmsg))
          }
        }
      }
    
    }

    PMSServer.scala

    package com.datatech.rest.mongo
    
    import akka.actor._
    import akka.stream._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.server.Directives._
    import pdi.jwt._
    import AuthBase._
    import MockUserAuthService._
    import org.mongodb.scala._
    
    import scala.collection.JavaConverters._
    import MongoModels._
    import MongoRepo._
    import MongoRoute._
    
    
    object PMSServer extends App {
    
    
      implicit val httpSys = ActorSystem("httpSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEC = httpSys.dispatcher
    
      val settings: MongoClientSettings = MongoClientSettings.builder()
        .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
        .build()
      implicit val client: MongoClient = MongoClient(settings)
      implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))
      implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)
      implicit val webPicDao = new MongoRepo[WebPic]("testdb","pms", WebPic.fromDocument)
      implicit val authenticator = new AuthBase()
        .withAlgorithm(JwtAlgorithm.HS256)
        .withSecretKey("OpenSesame")
        .withUserFunc(getValidUser)
    
      val route =
        path("auth") {
          authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
            post { complete(authenticator.issueJwt(userinfo))}
          }
        } ~
          pathPrefix("private") {
            authenticateOAuth2(realm = "private", authenticator.authenticateToken) { validToken =>
              FileRoute(validToken)
                .route
              // ~ ...
            }
          } ~
          pathPrefix("public") {
            (pathPrefix("crud")) {
              new MongoRoute[Person]("person")(personDao)
                .route ~
                new MongoRoute[Photo]("photo")(picDao)
                  .route
            } ~
            new MongoRoute[WebPic]("pms")(webPicDao).route
          }
    
      val (port, host) = (50081,"192.168.11.189")
    
      val bindingFuture = Http().bindAndHandle(route,host,port)
    
      println(s"Server running at $host $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
    
      bindingFuture.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
    
    
    }

    imaging.scala

    package com.datatech.sdp.file
    import javax.imageio.ImageIO
    import java.awt.Graphics2D
    import java.awt.image.BufferedImage
    import java.io.ByteArrayInputStream
    import java.io.ByteArrayOutputStream
    
    object Imaging {
      def setImageSize(barr: Array[Byte], wth: Int, hth: Int): Array[Byte] = {
        val input = ImageIO.read(new ByteArrayInputStream(barr))
        val image = new BufferedImage(wth, hth, BufferedImage.TYPE_INT_BGR)
        val g = image.getGraphics.asInstanceOf[Graphics2D]
        g.drawImage(input, 0, 0, wth, hth, null) //画图
        g.dispose()
        image.flush()
        val barros = new ByteArrayOutputStream()
        ImageIO.write(image, "jpg", barros)
        barr
      }
    }

     

  • 相关阅读:
    tar解压出错
    HUNNU11352:Digit Solitaire
    cocos2d-x 二进制文件的读写
    电子支付概述(1)
    新一批思科电子书下载
    HUNNU11354:Is the Name of This Problem
    POJ 3181 Dollar Dayz 简单DP
    Spring中IOC和AOP的详细解释
    atheros wifi 动因分析
    Android ActionBar相关
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/11392283.html
Copyright © 2011-2022 走看看