zoukankan      html  css  js  c++  java
  • Akka(29): Http:Server-Side-Api,Low-Level-Api

     Akka-http针对Connection的两头都提供了方便编程的Api,分别是Server-Side-Api和Client-Side-Api。通过这两个Api可以大大提高编程效率。当然,上期我们提到过,Http-Server是Akka-http的核心部分,所有系统集成功能都是在Server-Side实现的。Akka-http-Server-Side-Api可以说是最先进的Http-Server编程工具,支持:

    • Full support for HTTP persistent connections
    • Full support for HTTP pipelining
    • Full support for asynchronous HTTP streaming including “chunked” transfer encoding accessible through an idiomatic API
    • Optional SSL/TLS encryption
    • WebSocket support

    Server-Side-Api又分两个层次:Low-level-Server-Side-Api和High-level-Server-Side-Api。Low-level-server-api支持HTTP/1.1Server所有功能,包括:

    • Connection management
    • Parsing and rendering of messages and headers
    • Timeout management (for requests and connections)
    • Response ordering (for transparent pipelining support)

    其它Server功能如请求解析request routing,文件服务file serving,数据压缩compression等都放在了High-level-server-api里。Akka-http是基于Akka-stream编写的,所以我们需要从Akka-stream运算模式来理解Akka-http的类型表现形式。

    一个Http-Server是绑定在一个Socket上来接收客户端上传的request进行相关的服务提供的。Server对Socket的绑定在Akka-http里的可以Stream形式来表现:

    val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
      Http().bind(interface = "localhost", port = 8080)

    Server-Side Socket绑定实际上就是一个Akka-stream-source:Source[IncomingConnection]:

      /**
       * Creates a [[akka.stream.scaladsl.Source]] of [[akka.http.scaladsl.Http.IncomingConnection]] instances which represents a prospective HTTP server binding
       * on the given `endpoint`.
       *
       * If the given port is 0 the resulting source can be materialized several times. Each materialization will
       * then be assigned a new local port by the operating system, which can then be retrieved by the materialized
       * [[akka.http.scaladsl.Http.ServerBinding]].
       *
       * If the given port is non-zero subsequent materialization attempts of the produced source will immediately
       * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
       * [[akka.http.scaladsl.Http.ServerBinding]].
       *
       * If an [[ConnectionContext]] is given it will be used for setting up TLS encryption on the binding.
       * Otherwise the binding will be unencrypted.
       *
       * If no `port` is explicitly given (or the port value is negative) the protocol's default port will be used,
       * which is 80 for HTTP and 443 for HTTPS.
       *
       * To configure additional settings for a server started using this method,
       * use the `akka.http.server` config section or pass in a [[akka.http.scaladsl.settings.ServerSettings]] explicitly.
       */
      def bind(interface: String, port: Int = DefaultPortForProtocol,
               connectionContext: ConnectionContext = defaultServerHttpContext,
               settings:          ServerSettings    = ServerSettings(system),
               log:               LoggingAdapter    = system.log)(implicit fm: Materializer): Source[Http.IncomingConnection, Future[ServerBinding]] = {
        val fullLayer = fuseServerBidiFlow(settings, connectionContext, log)
    
        tcpBind(interface, choosePort(port, connectionContext), settings)
          .map(incoming ⇒ {
            val serverFlow = fullLayer.addAttributes(prepareAttributes(settings, incoming)) join incoming.flow
            IncomingConnection(incoming.localAddress, incoming.remoteAddress, serverFlow)
          })
          .mapMaterializedValue(materializeTcpBind)
      }

    run这个Source[IncomingConnection]产生一串连接Connection: 

      /**
       * Represents one accepted incoming HTTP connection.
       */
      final case class IncomingConnection(
        localAddress:  InetSocketAddress,
        remoteAddress: InetSocketAddress,
        flow:          Flow[HttpResponse, HttpRequest, NotUsed]) {
    
        /**
         * Handles the connection with the given flow, which is materialized exactly once
         * and the respective materialization result returned.
         */
        def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: Materializer): Mat =
          flow.joinMat(handler)(Keep.right).run()
    
        /**
         * Handles the connection with the given handler function.
         */
        def handleWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: Materializer): Unit =
          handleWith(Flow[HttpRequest].map(handler))
    
        /**
         * Handles the connection with the given handler function.
         */
        def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse], parallelism: Int = 1)(implicit fm: Materializer): Unit =
          handleWith(Flow[HttpRequest].mapAsync(parallelism)(handler))
      }

    IncomingConnection类型提供了个handleWith这样的streaming函数进行request到response的转换。用户可以下面的方式提供自定义的转换方法:

    调用handleWith传入Flow[HttpRequest,HttpResponse,_],如:

      def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
      val flow = Flow.fromFunction(req2Resp)

    提供HttpRequest=>HttpResponse函数传人handleWithSyncHandler:

    def syncHandler: HttpRequest => HttpResponse = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
    
        case req: HttpRequest =>
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
      }

    提供HttpRequest=>Future[HttpResponse]函数传人handleWithASyncHandler:

      def asyncHandler: HttpRequest => Future[HttpResponse] = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) => Future {
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>")) }
    
        case req: HttpRequest => Future {
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
        }
      }

    run Source[IncomingConnection,Future[ServerBinding]]返回结果为Future[ServerBinding]:

      val futBinding: Future[Http.ServerBinding] =
        connSource.to { Sink.foreach{ connection =>
            println(s"client address ${connection.remoteAddress}")
      //      connection handleWith flow
      //    connection handleWithSyncHandler syncHandler
          connection handleWithAsyncHandler asyncHandler
        }}.run()

    我们可以通过ServerBinding来释放绑定的Socket:

     /**
       * Represents a prospective HTTP server binding.
       *
       * @param localAddress  The local address of the endpoint bound by the materialization of the `connections` [[akka.stream.scaladsl.Source]]
       *
       */
      final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) {
    
        /**
         * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
         * [[akka.stream.scaladsl.Source]]
         *
         * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed.
         */
        def unbind(): Future[Unit] = unbindAction()
      }

    我们可以调用这个unbind():

      futBinding.flatMap(_.unbind())

    整个示范源代码如下:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import scala.concurrent._
    
    object LowLevelServerApi extends App {
      implicit val httpSys = ActorSystem("actorSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEc = httpSys.dispatcher
    
      val (interface,port) = ("localhost",8088)
      val connSource: Source[Http.IncomingConnection,Future[Http.ServerBinding]] =
        Http().bind(interface,port)
    
      def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
      val flow = Flow.fromFunction(req2Resp)
    
      def syncHandler: HttpRequest => HttpResponse = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
    
        case req: HttpRequest =>
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
      }
    
      def asyncHandler: HttpRequest => Future[HttpResponse] = {
        case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) => Future {
          HttpResponse(entity=
            HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>")) }
    
        case req: HttpRequest => Future {
          req.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
        }
      }
    
      val futBinding: Future[Http.ServerBinding] =
        connSource.to { Sink.foreach{ connection =>
            println(s"client address ${connection.remoteAddress}")
      //      connection handleWith flow
      //    connection handleWithSyncHandler syncHandler
          connection handleWithAsyncHandler asyncHandler
        }}.run()
    
      println(s"Server running at $interface $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
      futBinding.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
      
    }

     

     

     

     

     

     

  • 相关阅读:
    Linux入门之常用命令(14) kill
    【JVM命令系列】jstack
    【JVM命令系列】javap
    【JVM命令系列】jmap
    Hadoop安全(2)——————UserGroupInformation
    Hadoop安全(1)——————美团Hadoop安全实践
    软件测试流程进阶----两年软件测试总结
    每个程序员应该阅读的10本书籍
    成为优秀程序员的黄金10条法则
    Java异常的深入研究与分析
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7709377.html
Copyright © 2011-2022 走看看