zoukankan      html  css  js  c++  java
  • Akka(41): Http:DBTable-rows streaming

      在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用Source[ByteString,_]来读取文件然后放进HttpEntity中。我们还提到:如果需要进行数据库数据交换的话,可以用Source[ROW,_]来表示库表行,但首先必须进行ROW -> ByteString的转换。在上期讨论我们提到过这种转换其实是ROW->Json->ByteString或者反方向的转换,在Akka-http里称之为Marshalling和Unmarshalling。Akka-http的Marshalling实现采用了type-class编程模式,需要为每一种类型与Json的转换在可视域内提供Marshaller[A,B]类型的隐式实例。Akka-http默认的Json工具库是Spray-Json,着重case class,而且要提供JsonFormat?(case-class),其中?代表case class的参数个数,用起来略显复杂。不过因为是Akka-http的配套库,在将来Akka-http的持续发展中具有一定的优势,所以我们还是用它来进行下面的示范。

    下面就让我们开始写些代码吧。首先,我们用一个case class代表数据库表行结构,然后用它作为流元素来构建一个Source,如下:

      case class County(id: Int, name: String)
      val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }

    我们先设计服务端的数据下载部分:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.server.Directives._
    import akka._
    import akka.http.scaladsl.common._
    import spray.json.DefaultJsonProtocol
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    
    
    trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
    object Converters extends MyFormats {
      case class County(id: Int, name: String)
      val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
      implicit val countyFormat = jsonFormat2(County)
    }
    
    object HttpDBServer extends App {
      import Converters._
    
      implicit val httpSys = ActorSystem("httpSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEC = httpSys.dispatcher
    
    
      implicit val jsonStreamingSupport = EntityStreamingSupport.json()
        .withParallelMarshalling(parallelism = 8, unordered = false)
    
      val route =
        path("rows") {
          get {
            complete {
              source
            }
          }
        }
    
      val (port, host) = (8011,"localhost")
    
      val bindingFuture = Http().bindAndHandle(route,host,port)
    
      println(s"Server running at $host $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
      bindingFuture.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
    
    }

    在上面的代码里我们直接把source放进了complete(),然后期望这个directive能通过ToEntityMarshaller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]然后放入HttpResponse的HttpEntity里。转换结果只能在客户端得到证实。我们知道HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],我们可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

         case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
              futSource.onSuccess {
                case source => source.runForeach(println)
              }
     

    上面这个Unmarshal调用了下面这个FromEntityUnmarshaller[County]隐式实例:

      // support for as[Source[T, NotUsed]]
      implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =
        Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒
          if (support.supported.matches(e.contentType)) {
            val frames = e.dataBytes.via(support.framingDecoder)
            val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)
            val unmarshallingFlow =
              if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)
              else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)
            val elements = frames.viaMat(unmarshallingFlow)(Keep.right)
            FastFuture.successful(elements)
          } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
        }

    这个隐式实例是由Spray-Jason提供的,在SprayJsonSupport.scala里。
    下面是这部分客户端的完整代码: 

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import scala.util._
    import akka._
    import akka.http.scaladsl.common._
    import spray.json.DefaultJsonProtocol
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    import akka.http.scaladsl.unmarshalling.Unmarshal
    
    trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
    object Converters extends MyFormats {
      case class County(id: Int, name: String)
      implicit val countyFormat = jsonFormat2(County)
    }
    
    object HttpDBClient extends App {
      import Converters._
    
      implicit val sys = ActorSystem("ClientSys")
      implicit val mat = ActorMaterializer()
      implicit val ec = sys.dispatcher
    
      implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
    
      def downloadRows(request: HttpRequest) = {
        val futResp = Http(sys).singleRequest(request)
        futResp
          .andThen {
            case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
              futSource.onSuccess {
                case source => source.runForeach(println)
              }
            case Success(r@HttpResponse(code, _, _, _)) =>
              println(s"download request failed, response code: $code")
              r.discardEntityBytes()
            case Success(_) => println("Unable to download rows!")
            case Failure(err) => println(s"download failed: ${err.getMessage}")
    
          }
      }
      downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))
    
      scala.io.StdIn.readLine()
    
      sys.terminate()
    
    }

    以上我们已经实现了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作方式来处理下载数据。那么反向交换即从客户端上传一段表行的话就需要把一个Source[T,_]转换成Source[ByteString,_]然后放进HttpRequest的HttpEntity里。服务端收到数据后又要进行反向的转换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端没有提供像complete这样的强大的自动化功能。我们可能需要自定义并提供像ToRequestMarshaller[Source[T,_]]这样的隐式实例。但Akka-http的Marshalling-type-class是个非常复杂的系统。如果我们的目的是简单提供一个Source[ByteString,_],我们是否可以直接调用Spray-Json的函数来进行ROW->Son->ByteString转换呢?如下:

      import akka.util.ByteString
      import akka.http.scaladsl.model.HttpEntity.limitableByteSource
    
      val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
      def countyToByteString(c: County) = {
        ByteString(c.toJson.toString)
      }
      val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)
    
      val rowBytes = limitableByteSource(source via flowCountyToByteString)
    
      val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
      val data = HttpEntity(
        ContentTypes.`application/json`,
        rowBytes
      )
    
    我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数:
    package json {
    
      case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)
      class SerializationException(msg: String) extends RuntimeException(msg)
    
      private[json] class PimpedAny[T](any: T) {
        def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)
      }
    
      private[json] class PimpedString(string: String) {
        @deprecated("deprecated in favor of parseJson", "1.2.6")
        def asJson: JsValue = parseJson
        def parseJson: JsValue = JsonParser(string)
      }
    }

    假设服务端收到数据后以Akka-stream方式再转换成一个List返回,我们用下面的方法来测试功能:

      def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
        val futResp = Http(sys).singleRequest(
          request.copy(entity = dataEntity)
        )
        futResp
          .andThen {
            case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              entity.dataBytes.map(_.utf8String).runForeach(println)
            case Success(r@HttpResponse(code, _, _, _)) =>
              println(s"Upload request failed, response code: $code")
              r.discardEntityBytes()
            case Success(_) => println("Unable to Upload file!")
            case Failure(err) => println(s"Upload failed: ${err.getMessage}")
    
          }
      }

    服务端接收数据处理方法如下:

         post {
            withoutSizeLimit {
              entity(asSourceOf[County]) { source =>
                val futofNames: Future[List[String]] =
                  source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                complete {
                  futofNames
                }
              }
            }
          }

    考虑到在数据转换的过程中可能会出现异常。需要异常处理方法来释放backpressure:

      def postExceptionHandler: ExceptionHandler =
        ExceptionHandler {
          case _: RuntimeException =>
            extractRequest { req =>
              req.discardEntityBytes()
              complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
            }
        }
    
          post {
            withoutSizeLimit {
              handleExceptions(postExceptionHandler) {
                entity(asSourceOf[County]) { source =>
                  val futofNames: Future[List[String]] =
                    source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                  complete {
                    futofNames
                  }
                }
              }
            }
          }

    在客户端试运行返回结果显示:

      uploadRows(request,data)
    
    ["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

    正是我们期待的结果。

    下面是本次讨论的示范代码:

    服务端:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka._
    import akka.http.scaladsl.common._
    import spray.json.DefaultJsonProtocol
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    import scala.concurrent._
    import akka.http.scaladsl.server._
    import akka.http.scaladsl.server.Directives._
    import akka.http.scaladsl.model._
    
    trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
    object Converters extends MyFormats {
      case class County(id: Int, name: String)
      val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
      implicit val countyFormat = jsonFormat2(County)
    }
    
    object HttpDBServer extends App {
      import Converters._
    
      implicit val httpSys = ActorSystem("httpSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEC = httpSys.dispatcher
    
    
      implicit val jsonStreamingSupport = EntityStreamingSupport.json()
        .withParallelMarshalling(parallelism = 8, unordered = false)
    
      def postExceptionHandler: ExceptionHandler =
        ExceptionHandler {
          case _: RuntimeException =>
            extractRequest { req =>
              req.discardEntityBytes()
              complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
            }
        }
    
      val route =
        path("rows") {
          get {
            complete {
              source
            }
          } ~
          post {
            withoutSizeLimit {
              handleExceptions(postExceptionHandler) {
                entity(asSourceOf[County]) { source =>
                  val futofNames: Future[List[String]] =
                    source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                  complete {
                    futofNames
                  }
                }
              }
            }
          }
        }
    
      val (port, host) = (8011,"localhost")
    
      val bindingFuture = Http().bindAndHandle(route,host,port)
    
      println(s"Server running at $host $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
      bindingFuture.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
    
    }

    客户端:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import scala.util._
    import akka._
    import akka.http.scaladsl.common._
    import spray.json.DefaultJsonProtocol
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    import akka.http.scaladsl.unmarshalling._
    
    trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
    object Converters extends MyFormats {
      case class County(id: Int, name: String)
      implicit val countyFormat = jsonFormat2(County)
    }
    
    object HttpDBClient extends App {
      import Converters._
    
      implicit val sys = ActorSystem("ClientSys")
      implicit val mat = ActorMaterializer()
      implicit val ec = sys.dispatcher
    
      implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
    
      def downloadRows(request: HttpRequest) = {
        val futResp = Http(sys).singleRequest(request)
        futResp
          .andThen {
            case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
              futSource.onSuccess {
                case source => source.runForeach(println)
              }
            case Success(r@HttpResponse(code, _, _, _)) =>
              println(s"download request failed, response code: $code")
              r.discardEntityBytes()
            case Success(_) => println("Unable to download rows!")
            case Failure(err) => println(s"download failed: ${err.getMessage}")
    
          }
      }
      downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))
    
      
      import akka.util.ByteString
      import akka.http.scaladsl.model.HttpEntity.limitableByteSource
    
      val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
      def countyToByteString(c: County) = {
        ByteString(c.toJson.toString)
      }
      val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)
    
      val rowBytes = limitableByteSource(source via flowCountyToByteString)
    
      val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
      val data = HttpEntity(
        ContentTypes.`application/json`,
        rowBytes
      )
    
      def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
        val futResp = Http(sys).singleRequest(
          request.copy(entity = dataEntity)
        )
        futResp
          .andThen {
            case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              entity.dataBytes.map(_.utf8String).runForeach(println)
            case Success(r@HttpResponse(code, _, _, _)) =>
              println(s"Upload request failed, response code: $code")
              r.discardEntityBytes()
            case Success(_) => println("Unable to Upload file!")
            case Failure(err) => println(s"Upload failed: ${err.getMessage}")
    
          }
      }
    
      uploadRows(request,data)
    
      scala.io.StdIn.readLine()
    
      sys.terminate()
    
    }

     

     

     

     

     

     

     

     

  • 相关阅读:
    PHP腾讯地图地图接口调用提示{ "status": 110, "message": "请求来源未被授权,此次请求无来源信息" }
    PHP带参数匿名函数
    微信小程序实现图片预加载(图片延迟加载)
    快速排序
    《Linux命令行与shell脚本编程大全》第十八章 图形化桌面环境中的脚本编程
    《Linux命令行与shell脚本编程大全》第十七章 创建函数
    makefile中伪目标的理解
    《Linux命令行与shell脚本编程大全》第十六章 控制脚本
    《Linux命令行与shell脚本编程大全》第十五章 呈现数据
    《Linux命令行与shell脚本编程大全》第十四章 处理用户输入
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8026944.html
Copyright © 2011-2022 走看看