zoukankan      html  css  js  c++  java
  • Akka(29): Http:Server-Side-Api,Low-Level-Api

     Akka-http针对Connection的两头都提供了方便编程的Api,分别是Server-Side-Api和Client-Side-Api。通过这两个Api可以大大提高编程效率。当然,上期我们提到过,Http-Server是Akka-http的核心部分,所有系统集成功能都是在Server-Side实现的。Akka-http-Server-Side-Api可以说是最先进的Http-Server编程工具,支持:

    • Full support for HTTP persistent connections
    • Full support for HTTP pipelining
    • Full support for asynchronous HTTP streaming including “chunked” transfer encoding accessible through an idiomatic API
    • Optional SSL/TLS encryption
    • WebSocket support

    Server-Side-Api又分两个层次:Low-level-Server-Side-Api和High-level-Server-Side-Api。Low-level-server-api支持HTTP/1.1Server所有功能,包括:

    • Connection management
    • Parsing and rendering of messages and headers
    • Timeout management (for requests and connections)
    • Response ordering (for transparent pipelining support)

    其它Server功能如请求解析request routing,文件服务file serving,数据压缩compression等都放在了High-level-server-api里。Akka-http是基于Akka-stream编写的,所以我们需要从Akka-stream运算模式来理解Akka-http的类型表现形式。

    一个Http-Server是绑定在一个Socket上来接收客户端上传的request进行相关的服务提供的。Server对Socket的绑定在Akka-http里的可以Stream形式来表现:

    val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
      Http().bind(interface = "localhost", port = 8080)

    Server-Side Socket绑定实际上就是一个Akka-stream-source:Source[IncomingConnection]:

      /**
       * Creates a [[akka.stream.scaladsl.Source]] of [[akka.http.scaladsl.Http.IncomingConnection]] instances which represents a prospective HTTP server binding
       * on the given `endpoint`.
       *
       * If the given port is 0 the resulting source can be materialized several times. Each materialization will
       * then be assigned a new local port by the operating system, which can then be retrieved by the materialized
       * [[akka.http.scaladsl.Http.ServerBinding]].
       *
       * If the given port is non-zero subsequent materialization attempts of the produced source will immediately
       * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
       * [[akka.http.scaladsl.Http.ServerBinding]].
       *
       * If an [[ConnectionContext]] is given it will be used for setting up TLS encryption on the binding.
       * Otherwise the binding will be unencrypted.
       *
       * If no `port` is explicitly given (or the port value is negative) the protocol's default port will be used,
       * which is 80 for HTTP and 443 for HTTPS.
       *
       * To configure additional settings for a server started using this method,
       * use the `akka.http.server` config section or pass in a [[akka.http.scaladsl.settings.ServerSettings]] explicitly.
       */
      def bind(interface: String, port: Int = DefaultPortForProtocol,
               connectionContext: ConnectionContext = defaultServerHttpContext,
               settings:          ServerSettings    = ServerSettings(system),
               log:               LoggingAdapter    = system.log)(implicit fm: Materializer): Source[Http.IncomingConnection, Future[ServerBinding]] = {
        val fullLayer = fuseServerBidiFlow(settings, connectionContext, log)
    
        tcpBind(interface, choosePort(port, connectionContext), settings)
          .map(incoming ⇒ {
            val serverFlow = fullLayer.addAttributes(prepareAttributes(settings, incoming)) join incoming.flow
            IncomingConnection(incoming.localAddress, incoming.remoteAddress, serverFlow)
          })
          .mapMaterializedValue(materializeTcpBind)
      }

    run这个Source[IncomingConnection]产生一串连接Connection: 

      /**
       * Represents one accepted incoming HTTP connection.
       */
      final case class IncomingConnection(
        localAddress:  InetSocketAddress,
        remoteAddress: InetSocketAddress,
        flow:          Flow[HttpResponse, HttpRequest, NotUsed]) {
    
        /**
         * Handles the connection with the given flow, which is materialized exactly once
         * and the respective materialization result returned.
         */
        def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: Materializer): Mat =
          flow.joinMat(handler)(Keep.right).run()
    
        /**
         * Handles the connection with the given handler function.
         */
        def handleWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: Materializer): Unit =
          handleWith(Flow[HttpRequest].map(handler))
    
        /**
         * Handles the connection with the given handler function.
         */
        def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse], parallelism: Int = 1)(implicit fm: Materializer): Unit =
          handleWith(Flow[HttpRequest].mapAsync(parallelism)(handler))
      }

    IncomingConnection类型提供了个handleWith这样的streaming函数进行request到response的转换。用户可以下面的方式提供自定义的转换方法:

    调用handleWith传入Flow[HttpRequest,HttpResponse,_],如:

      def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
      val flow = Flow.fromFunction(req2Resp)

    提供HttpRequest=>HttpResponse函数传人handleWithSyncHandler:

    def syncHandler: HttpRequest => HttpResponse = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
    
        case req: HttpRequest =>
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
      }

    提供HttpRequest=>Future[HttpResponse]函数传人handleWithASyncHandler:

      def asyncHandler: HttpRequest => Future[HttpResponse] = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) => Future {
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>")) }
    
        case req: HttpRequest => Future {
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
        }
      }

    run Source[IncomingConnection,Future[ServerBinding]]返回结果为Future[ServerBinding]:

      val futBinding: Future[Http.ServerBinding] =
        connSource.to { Sink.foreach{ connection =>
            println(s"client address ${connection.remoteAddress}")
      //      connection handleWith flow
      //    connection handleWithSyncHandler syncHandler
          connection handleWithAsyncHandler asyncHandler
        }}.run()

    我们可以通过ServerBinding来释放绑定的Socket:

     /**
       * Represents a prospective HTTP server binding.
       *
       * @param localAddress  The local address of the endpoint bound by the materialization of the `connections` [[akka.stream.scaladsl.Source]]
       *
       */
      final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) {
    
        /**
         * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
         * [[akka.stream.scaladsl.Source]]
         *
         * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed.
         */
        def unbind(): Future[Unit] = unbindAction()
      }

    我们可以调用这个unbind():

      futBinding.flatMap(_.unbind())

    整个示范源代码如下:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import scala.concurrent._
    
    object LowLevelServerApi extends App {
      implicit val httpSys = ActorSystem("actorSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEc = httpSys.dispatcher
    
      val (interface,port) = ("localhost",8088)
      val connSource: Source[Http.IncomingConnection,Future[Http.ServerBinding]] =
        Http().bind(interface,port)
    
      def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
      val flow = Flow.fromFunction(req2Resp)
    
      def syncHandler: HttpRequest => HttpResponse = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
    
        case req: HttpRequest =>
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
      }
    
      def asyncHandler: HttpRequest => Future[HttpResponse] = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) => Future {
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>")) }
    
        case req: HttpRequest => Future {
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
        }
      }
    
      val futBinding: Future[Http.ServerBinding] =
        connSource.to { Sink.foreach{ connection =>
            println(s"client address ${connection.remoteAddress}")
      //      connection handleWith flow
      //    connection handleWithSyncHandler syncHandler
          connection handleWithAsyncHandler asyncHandler
        }}.run()
    
      println(s"Server running at $interface $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
      futBinding.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
      
    }

     

     

     

     

     

     

  • 相关阅读:
    LeetCode 264. Ugly Number II
    LeetCode 231. Power of Two
    LeetCode 263. Ugly Number
    LeetCode 136. Single Number
    LeetCode 69. Sqrt(x)
    LeetCode 66. Plus One
    LeetCode 70. Climbing Stairs
    LeetCode 628. Maximum Product of Three Numbers
    Leetcode 13. Roman to Integer
    大二暑假周进度报告03
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7709377.html
Copyright © 2011-2022 走看看