zoukankan      html  css  js  c++  java
  • Akka(38): Http:Entityof ByteString-数据传输基础

      我们说过Akka-http是一个好的系统集成工具,集成是通过数据交换方式实现的。Http是个在网上传输和接收的规范协议。所以,在使用Akka-http之前,可能我们还是需要把Http模式的网上数据交换细节了解清楚。数据交换双方是通过Http消息类型Request和Response来实现的。在Akka-http中对应的是HttpRequest和HttpResponse。这两个类型都具备HttpEntity类型来装载需要交换的数据。首先,无论如何数据在线上的表现形式肯定是一串bytes。所以,数据交换两头Request,Response中的Entity也必须是以bytes来表达的。在Akka-http里我们把需要传输的数据转换成ByteString,通过网络发送給接收端、接收端再把收到消息Entity中的ByteString转换成目标类型的数据。这两个转换过程就是Akka-http的Marshalling和Unmarshalling过程了。我们先从HttpEntity的构建函数来了解它的定义:

    object HttpEntity {
      implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-8)`, string)
      implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes)
      implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data)
      def apply(contentType: ContentType.NonBinary, string: String): HttpEntity.Strict =
        if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset)))
      def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict =
        if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes))
      def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict =
        if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data)
    
      def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity =
        if (contentLength == 0) empty(contentType) else HttpEntity.Default(contentType, contentLength, data)
      def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked =
        HttpEntity.Chunked.fromData(contentType, data)
    ...

    很明显,HttpEntity可以分两大类,一种是Strict类型的,它的data是ByteString。另一种是UniversalEntity类型,它的数据dataBytes是Source[ByteString,Any]。无论如何最终在线上的还是ByteString。HttpEntity的ContentType注明了传输数据格式,有:

    object ContentTypes {
      val `application/json` = ContentType(MediaTypes.`application/json`)
      val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`)
      val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8`
      val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8`
      val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8`
      val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8`
    
      // used for explicitly suppressing the rendering of Content-Type headers on requests and responses
      val NoContentType = ContentType(MediaTypes.NoMediaType)
    }

    注意:ContentType只是一种备注,不影响线上数据表达形式,线上的数据永远是ByteString。但是,其中的application/octet-stream类型代表数据必须是Source[ByteString,Any]。我们下面就通过客户端的例子来理解HttpEntity。下面是一个客户端测试函数:

      def runService(request: HttpRequest, rentity: RequestEntity) = {
       val futResp = for {
         entity <- Future.successful(rentity)
         resp <- Http(sys).singleRequest(
           request.copy(entity = rentity)
         )
       } yield resp
    
       futResp
        .andThen {
          case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
            entity.dataBytes.map(_.utf8String).runForeach(println)
          case Success(r@HttpResponse(code, _, _, _)) =>
            println(s"Download request failed, response code: $code")
            r.discardEntityBytes()
          case Success(_) => println("Unable to download rows!")
          case Failure(err) => println(s"Download failed: ${err.getMessage}")
    
        }
      }

    我们只需要对这个函数传入RequestEntity就可以了解返回Response里Entity的许多细节了。首先我们要求服务端发送一个纯字符串Hello World。服务端代码如下:

     } ~ path("text") {
          get {
            complete("Hello World!")
          } ~

    虽然complete("Hello World!")有些迷糊,不过应该complete做了些字符串到ByteString的转换。我们可以从上面这个runService函数得到证实。下面是这个例子的调用:

      val reqText = HttpRequest(uri = s"http://localhost:8011/text")
      runService(reqText,HttpEntity.Empty)
        .andThen{case _ => sys.terminate()}

    从显示的结果可以得出runService函数中的entity.dataBytes.map(_.utf8String)已经把ByteString转换成了String,也就是说服务器端发送的Entity里的数据是ByteString。

    我们再试着发送一些数据給服务端,然后让服务端把结果通过response entity返回来:

        } ~ path("text") {
          get {
            complete("Hello World!")
          } ~
            post {
              withoutSizeLimit {
                extractDataBytes { bytes =>
                  val data = bytes.runFold(ByteString())(_ ++ _)
                  onComplete(data) { t =>
                    complete(t)
                  }
                }
              }
            }

    我们看到服务端对request entity的操作是以ByteString进行的。客户端上传一串字符的request如下:

      val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
      val uploadText = HttpEntity(
        ContentTypes.`text/plain(UTF-8)`,
        // transform each number to a chunk of bytes
        ByteString("hello world again")
      )
      runService(postText,uploadText)
        .andThen{case _ => sys.terminate()}

    我们可以看到放进entity里的数据是ByteString。

    我们知道Akka-http是基于Akka-Stream的,具备Reactive-Stream功能特性。下面我们就示范一下如何进行stream的上传下载。首先定制一个Source:

      val numbers = Source.fromIterator(() =>
        Iterator.continually(Random.nextInt()))
        .map(n => ByteString(s"$n
    "))
      //make conform to withoutSizeLimit constrain
      val source = limitableByteSource(numbers)

    服务端也是用HttpEntity来装载这个Source然后通过HttpRequest传给客户端的:

      path("random") {
          get {
            complete(
              HttpEntity(
                ContentTypes.`application/octet-stream`,
                // transform each number to a chunk of bytes
                source.take(10000)
              )
            )
          } ~
      

    我们在客户端还是用runService来解析传过来的entity。由于接收一个大型的Source,所以需要修改一下接收方式代码:

       futResp
        .andThen {
          case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
            val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)
                 Await.result(futEnt, Duration.Inf) // throws if binding fails
                 println("End of stream!!!")
          case Success(r@HttpResponse(code, _, _, _)) =>
            println(s"Download request failed, response code: $code")
            r.discardEntityBytes()
          case Success(_) => println("Unable to download rows!")
          case Failure(err) => println(s"Download failed: ${err.getMessage}")
    
        }
     

    用下面的方式调用

      val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
        runService(reqRandom,HttpEntity.Empty)
         .andThen{case _ => sys.terminate()}

    再示范一下在客户端用Source上传数据。服务端代码:

           post {
              withoutSizeLimit {
                extractDataBytes { bytes =>
                  val data = bytes.runFold(ByteString())(_ ++ _)
                  onComplete(data) { t =>
                    complete(t)
                  }
                }
              }
            }

    客户端上传数据范例:

     val numbers = Source.fromIterator(() =>
        Iterator.continually(Random.nextInt()))
        .map(n => ByteString(s"$n
    "))
      //make conform to withoutSizeLimit constrain
      val source = limitableByteSource(numbers)
    
      val bytes = HttpEntity(
        ContentTypes.`application/octet-stream`,
        // transform each number to a chunk of bytes
        source.take(10000)
      )
      val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
      runService(postRandom,bytes)
        .andThen{case _ => sys.terminate()}

    从上面讨论我们了解了在Marshal,Unmarshal下层只是ByteString的操作和转换。下面是本次讨论示范源代码:

    服务端:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.server.Directives._
    import akka.http.scaladsl.model._
    import akka.util.ByteString
    import akka.http.scaladsl.model.HttpEntity._
    
    import scala.util.Random
    
    object ServerEntity extends App {
    
      implicit val httpSys = ActorSystem("httpSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEC = httpSys.dispatcher
    
    
      val numbers = Source.fromIterator(() =>
        Iterator.continually(Random.nextInt()))
        .map(n => ByteString(s"$n
    "))
      //make conform to withoutSizeLimit constrain
      val source = limitableByteSource(numbers)
    
    
    
      val route =
        path("random") {
          get {
            withoutSizeLimit {
              complete(
                HttpEntity(
                  ContentTypes.`application/octet-stream`,
                  // transform each number to a chunk of bytes
                  source.take(1000))
              )
            }
          } ~
            post {
              withoutSizeLimit {
                extractDataBytes { bytes =>
                  val data = bytes.runFold(ByteString())(_ ++ _)
                  onComplete(data) { t =>
                    complete(t)
                  }
                }
              }
            }
        } ~ path("text") {
          get {
            complete("Hello World!")
          } ~
            post {
              withoutSizeLimit {
                extractDataBytes { bytes =>
                  val data = bytes.runFold(ByteString())(_ ++ _)
                  onComplete(data) { t =>
                    complete(t)
                  }
                }
              }
            }
        }
    
    
      val (port, host) = (8011,"localhost")
    
      val bindingFuture = Http().bindAndHandle(route,host,port)
    
      println(s"Server running at $host $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
      bindingFuture.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
    
    }

    客户端:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.HttpEntity.limitableByteSource
    import akka.http.scaladsl.model._
    
    import scala.concurrent.duration._
    import akka.util.ByteString
    
    import scala.concurrent._
    import scala.util._
    
    object ClientEntity extends App {
    
      implicit val sys = ActorSystem("ClientSys")
      implicit val mat = ActorMaterializer()
      implicit val ec = sys.dispatcher
    
      def runService(request: HttpRequest, rentity: RequestEntity) = {
       val futResp = for {
         entity <- Future.successful(rentity)
         resp <- Http(sys).singleRequest(
           request.copy(entity = rentity)
         )
       } yield resp
    
       futResp
        .andThen {
          case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
            val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)
                 Await.result(futEnt, Duration.Inf) // throws if binding fails
                 println("End of stream!!!")
          case Success(r@HttpResponse(code, _, _, _)) =>
            println(s"Download request failed, response code: $code")
            r.discardEntityBytes()
          case Success(_) => println("Unable to download rows!")
          case Failure(err) => println(s"Download failed: ${err.getMessage}")
    
        }
      }
    
      val reqText = HttpRequest(uri = s"http://localhost:8011/text")
    //  runService(reqText,HttpEntity.Empty)
    //    .andThen{case _ => sys.terminate()}
    
      val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
      val uploadText = HttpEntity(
        ContentTypes.`text/plain(UTF-8)`,
        // transform each number to a chunk of bytes
        ByteString("hello world again")
      )
    //  runService(postText,uploadText)
    //    .andThen{case _ => sys.terminate()}
    
      val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
     //   runService(reqRandom,HttpEntity.Empty)
     //    .andThen{case _ => sys.terminate()}
    
      val numbers = Source.fromIterator(() =>
        Iterator.continually(Random.nextInt()))
        .map(n => ByteString(s"$n
    "))
      //make conform to withoutSizeLimit constrain
      val source = limitableByteSource(numbers)
    
      val bytes = HttpEntity(
        ContentTypes.`application/octet-stream`,
        // transform each number to a chunk of bytes
        source.take(10000)
      )
      val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
      runService(postRandom,bytes)
        .andThen{case _ => sys.terminate()}
    
    
    }

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    静态成员变量
    设计模式:空对象模式(Null Object Pattern)
    超详细LAMP环境搭建
    WCF 学习笔记之双工实现
    new和instanceof的内部机制
    C#开源磁盘/内存缓存引擎
    C++设计模式-Flyweight享元模式
    Javascript内存泄漏
    流量计数器
    运用Mono.Cecil 反射读取.NET程序集元数据
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7930597.html
Copyright © 2011-2022 走看看