zoukankan      html  css  js  c++  java
  • Akka(30): Http:High-Level-Api,Routing DSL

      在上篇我们介绍了Akka-http Low-Level-Api。实际上这个Api提供了Server对进来的Http-requests进行处理及反应的自定义Flow或者转换函数的接入界面。我们看看下面官方文档给出的例子:

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.HttpMethods._
    import akka.http.scaladsl.model._
    import akka.stream.ActorMaterializer
    import scala.io.StdIn
    
    object WebServer {
    
      def main(args: Array[String]) {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
        // needed for the future map/flatmap in the end
        implicit val executionContext = system.dispatcher
    
        val requestHandler: HttpRequest => HttpResponse = {
          case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
            HttpResponse(entity = HttpEntity(
              ContentTypes.`text/html(UTF-8)`,
              "<html><body>Hello world!</body></html>"))
    
          case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
            HttpResponse(entity = "PONG!")
    
          case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
            sys.error("BOOM!")
    
          case r: HttpRequest =>
            r.discardEntityBytes() // important to drain incoming HTTP Entity stream
            HttpResponse(404, entity = "Unknown resource!")
        }
    
        val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080)
        println(s"Server online at http://localhost:8080/
    Press RETURN to stop...")
        StdIn.readLine() // let it run until user presses return
        bindingFuture
          .flatMap(_.unbind()) // trigger unbinding from the port
          .onComplete(_ => system.terminate()) // and shutdown when done
      }
    }

      我们看到上面例子里的requestHandler函数用模式匹配方式对可能收到的HttpRequest进行了相关HttpResponse的对应。在对应的过程中可能还会按request要求进行一些Server端的运算作为例如Rest-Api这样的服务。不过对于大型的服务,模式匹配方式就会显得篇幅臃肿及模式僵化。Akka-http提供了一套routing DSL作为High-Level-Api的主要组成部分。用routing DSL代替Low-Level-Api的模式匹配方式可以更简练的编制HttpRequest到HttpResponse的转换服务,能更灵活高效的实现现代大型Rest-Api服务。routing DSL实现Rest-Api服务的方式是通过构建一个由组件Directives组合而成的多个多层三明治结构Route。Route是一个类型:

      type Route = RequestContext ⇒ Future[RouteResult]

    下面是个Route例子:

        val route: Flow[HttpRequest, HttpResponse, NotUsed]=
          get {
            pathSingleSlash {
              complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>"))
            } ~
              path("ping") {
                complete("PONG!")
              } ~
              path("crash") {
                sys.error("BOOM!")
              }
          }

    在上期讨论的例子里我们可以这样使用route:

      val futBinding: Future[Http.ServerBinding] =
        connSource.to { Sink.foreach{ connection =>
          println(s"client address ${connection.remoteAddress}")
          //    connection handleWith flow
          //    connection handleWithSyncHandler syncHandler
          //connection handleWithAsyncHandler asyncHandler
          connection handleWith route
        }}.run()

    handleWith(flow)的参数应该是Flow[HttpRequest,HttpResponse,_]才对呀?这个我们先看看RouteResult对象: 

    /**
     * The result of handling a request.
     *
     * As a user you typically don't create RouteResult instances directly.
     * Instead, use the methods on the [[RequestContext]] to achieve the desired effect.
     */
    sealed trait RouteResult extends javadsl.server.RouteResult
    
    object RouteResult {
      final case class Complete(response: HttpResponse) extends javadsl.server.Complete with RouteResult {
        override def getResponse = response
      }
      final case class Rejected(rejections: immutable.Seq[Rejection]) extends javadsl.server.Rejected with RouteResult {
        override def getRejections = rejections.map(r ⇒ r: javadsl.server.Rejection).toIterable.asJava
      }
    
      implicit def route2HandlerFlow(route: Route)(implicit
        routingSettings: RoutingSettings,
                                                   parserSettings:   ParserSettings,
                                                   materializer:     Materializer,
                                                   routingLog:       RoutingLog,
                                                   executionContext: ExecutionContext = null,
                                                   rejectionHandler: RejectionHandler = RejectionHandler.default,
                                                   exceptionHandler: ExceptionHandler = null): Flow[HttpRequest, HttpResponse, NotUsed] =
        Route.handlerFlow(route)
    }

    这里有个隐式转换route2HandlerFlow把Route转换成Flow[HttpRequest,HttpResponse,NotUsed],问题解决了。

    从type Route=RequestContext => Future[RouteResult]可以看到:Route就是一个把RequestContext转换成Future[RouteResult]的函数。RequestContext实质上封装了个Request以及对Request进行操作的环境、配置和工具:

    /**
     * This class is not meant to be extended by user code.
     *
     * Immutable object encapsulating the context of an [[akka.http.scaladsl.model.HttpRequest]]
     * as it flows through a akka-http Route structure.
     */
    @DoNotInherit
    trait RequestContext {
    
      /** The request this context represents. Modelled as a `val` so as to enable an `import ctx.request._`. */
      val request: HttpRequest
    
      /** The unmatched path of this context. Modelled as a `val` so as to enable an `import ctx.unmatchedPath._`. */
      val unmatchedPath: Uri.Path
    
      /**
       * The default ExecutionContext to be used for scheduling asynchronous logic related to this request.
       */
      implicit def executionContext: ExecutionContextExecutor
    ...
    }

    Route是一种可组合组件。我们可以用简单的Route组合成更多层次的Route。下面是组合Route的几种方式:

    1、Route转化:对输入的request,输出的response进行转化处理后把实际运算托付给下一层内部(inner)Route

    2、筛选Route:只容许符合某种条件的Route通过并拒绝其它不符合条件的Route

    3、链接Route:假如一个Route被拒绝,尝试下一个Route。这个是通过 ~ 操作符号实现的

    在Akka-http的routing DSL里这些Route组合操作是通过Directive实现的。Akka-http提供了大量现成的Directive,我们也可以自定义一些特殊功能的Directive,详情可以查询官方文件或者api文件。

    Directive的表达形式如下:

    dirname(arguments) { extractions =>
      ... // 内层inner route
    }

    下面是Directive的一些用例: 

    下面的三个route效果相等:

    val route: Route = { ctx =>
      if (ctx.request.method == HttpMethods.GET)
        ctx.complete("Received GET")
      else
        ctx.complete("Received something else")
    }
    
    val route =
      get {
        complete("Received GET")
      } ~
      complete("Received something else")
      
    val route =
      get { ctx =>
        ctx.complete("Received GET")
      } ~
      complete("Received something else")

    下面列出一些Directive的组合例子:

    val route: Route =
      path("order" / IntNumber) { id =>
        get {
          complete {
            "Received GET request for order " + id
          }
        } ~
        put {
          complete {
            "Received PUT request for order " + id
          }
        }
      }
    
    def innerRoute(id: Int): Route =
      get {
        complete {
          "Received GET request for order " + id
        }
      } ~
      put {
        complete {
          "Received PUT request for order " + id
        }
      }
    val route: Route = path("order" / IntNumber) { id => innerRoute(id) }
    
    val route =
      path("order" / IntNumber) { id =>
        (get | put) { ctx =>
          ctx.complete(s"Received ${ctx.request.method.name} request for order $id")
        }
      }
    
    val route =
      path("order" / IntNumber) { id =>
        (get | put) {
          extractMethod { m =>
            complete(s"Received ${m.name} request for order $id")
          }
        }
      }
    
    val getOrPut = get | put
    val route =
      path("order" / IntNumber) { id =>
        getOrPut {
          extractMethod { m =>
            complete(s"Received ${m.name} request for order $id")
          }
        }
      }
    
    val route =
      (path("order" / IntNumber) & getOrPut & extractMethod) { (id, m) =>
        complete(s"Received ${m.name} request for order $id")
      }
    
    val orderGetOrPutWithMethod =
      path("order" / IntNumber) & (get | put) & extractMethod
    val route =
      orderGetOrPutWithMethod { (id, m) =>
        complete(s"Received ${m.name} request for order $id")
      }

    上面例子里的~ & | 定义如下:

    object RouteConcatenation extends RouteConcatenation {
    
      class RouteWithConcatenation(route: Route) {
        /**
         * Returns a Route that chains two Routes. If the first Route rejects the request the second route is given a
         * chance to act upon the request.
         */
        def ~(other: Route): Route = { ctx ⇒
          import ctx.executionContext
          route(ctx).fast.flatMap {
            case x: RouteResult.Complete ⇒ FastFuture.successful(x)
            case RouteResult.Rejected(outerRejections) ⇒
              other(ctx).fast.map {
                case x: RouteResult.Complete               ⇒ x
                case RouteResult.Rejected(innerRejections) ⇒ RouteResult.Rejected(outerRejections ++ innerRejections)
              }
          }
        }
      }
    }
    
      /**
       * Joins two directives into one which runs the second directive if the first one rejects.
       */
      def |[R >: L](that: Directive[R]): Directive[R] =
        recover(rejections ⇒ directives.BasicDirectives.mapRejections(rejections ++ _) & that)(that.ev)
    
      /**
       * Joins two directives into one which extracts the concatenation of its base directive extractions.
       * NOTE: Extraction joining is an O(N) operation with N being the number of extractions on the right-side.
       */
      def &(magnet: ConjunctionMagnet[L]): magnet.Out = magnet(this)

    我们可以从上面这些示范例子得出结论:Directive的组合能力是routing DSL的核心。来看看Directive的组合能力是如何实现的。Directive类定义如下:

    //#basic
    abstract class Directive[L](implicit val ev: Tuple[L]) {
    
      /**
       * Calls the inner route with a tuple of extracted values of type `L`.
       *
       * `tapply` is short for "tuple-apply". Usually, you will use the regular `apply` method instead,
       * which is added by an implicit conversion (see `Directive.addDirectiveApply`).
       */
      def tapply(f: L ⇒ Route): Route
      ...
    }
      /**
       * Constructs a directive from a function literal.
       */
      def apply[T: Tuple](f: (T ⇒ Route) ⇒ Route): Directive[T] =
        new Directive[T] { def tapply(inner: T ⇒ Route) = f(inner) }
    
      /**
       * A Directive that always passes the request on to its inner route (i.e. does nothing).
       */
      val Empty: Directive0 = Directive(_(()))
    ...
      implicit class SingleValueModifiers[T](underlying: Directive1[T]) extends AnyRef {
        def map[R](f: T ⇒ R)(implicit tupler: Tupler[R]): Directive[tupler.Out] =
          underlying.tmap { case Tuple1(value) ⇒ f(value) }
    
        def flatMap[R: Tuple](f: T ⇒ Directive[R]): Directive[R] =
          underlying.tflatMap { case Tuple1(value) ⇒ f(value) }
    
        def require(predicate: T ⇒ Boolean, rejections: Rejection*): Directive0 =
          underlying.filter(predicate, rejections: _*).tflatMap(_ ⇒ Empty)
    
        def filter(predicate: T ⇒ Boolean, rejections: Rejection*): Directive1[T] =
          underlying.tfilter({ case Tuple1(value) ⇒ predicate(value) }, rejections: _*)
      }
    }

    注意implicit ev: Tuple[L]是给compiler的证例,它要求Tuple[L]存在于可视域。Akka-http提供了所有22个TupleXX[L]的隐形实例。再注意implicit class singleValueModifiers[T]:它提供了多层Directive的自动展平,能够实现下面的自动转换结果:

    Directive1[T] = Directive[Tuple1[T]]
    Directive1[Tuple2[M,N]] = Directive[Tuple1[Tuple2[M,N]]] = Directive[Tuple2[M,N]]
    Directive1[Tuple3[M,N,G]] = ... = Directive[Tuple3[M,N,G]]
    Directive1[Tuple4[M1,M2,M3,M4]] = ... = Directive[Tuple4[M1,M2,M3,M4]]
    ...
    Directive1[Unit] = Directive0

    Directive1,Directive0:

      type Directive0 = Directive[Unit]
      type Directive1[T] = Directive[Tuple1[T]]

    下面是这几种Directive的使用模式:

      dirname { route }                  //Directive0
      dirname[L] { L => route }          //Directive1[L]
      dirname[T] { (T1,T2...) => route}  //Directive[T]

    任何类型值到Tuple的自动转换是通过Tupler类实现的:

    /**
     * Provides a way to convert a value into an Tuple.
     * If the value is already a Tuple then it is returned unchanged, otherwise it's wrapped in a Tuple1 instance.
     */
    trait Tupler[T] {
      type Out
      def OutIsTuple: Tuple[Out]
      def apply(value: T): Out
    }
    
    object Tupler extends LowerPriorityTupler {
      implicit def forTuple[T: Tuple]: Tupler[T] { type Out = T } =
        new Tupler[T] {
          type Out = T
          def OutIsTuple = implicitly[Tuple[Out]]
          def apply(value: T) = value
        }
    }
    
    private[server] abstract class LowerPriorityTupler {
      implicit def forAnyRef[T]: Tupler[T] { type Out = Tuple1[T] } =
        new Tupler[T] {
          type Out = Tuple1[T]
          def OutIsTuple = implicitly[Tuple[Out]]
          def apply(value: T) = Tuple1(value)
        }
    }

    我的理解是:Route里Directive的主要功能可以分成两部分:一是如程序菜单拣选,二是对Request,Response,Entity的读写。我们把第二项功能放在以后的讨论里,下面就提供一些RestApi的菜单拣选样例:

    trait UsersApi extends JsonMappings{
      val usersApi =
        (path("users") & get ) {
           complete (UsersDao.findAll.map(_.toJson))
        }~
        (path("users"/IntNumber) & get) { id =>
            complete (UsersDao.findById(id).map(_.toJson))
        }~
        (path("users") & post) { entity(as[User]) { user =>
            complete (UsersDao.create(user).map(_.toJson))
          }
        }~
        (path("users"/IntNumber) & put) { id => entity(as[User]) { user =>
            complete (UsersDao.update(user, id).map(_.toJson))
          }
        }~
        (path("users"/IntNumber) & delete) { userId =>
          complete (UsersDao.delete(userId).map(_.toJson))
        }
    }
    
    trait CommentsApi extends JsonMappings{
      val commentsApi =
        (path("users"/IntNumber/"posts"/IntNumber/"comments") & get ) {(userId, postId) =>
           complete (CommentsDao.findAll(userId, postId).map(_.toJson))
        }~
          (path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & get) { (userId, postId, commentId) =>
            complete (CommentsDao.findById(userId, postId, commentId).map(_.toJson))
        }~
          (path("comments") & post) { entity(as[Comment]) { comment =>
            complete (CommentsDao.create(comment).map(_.toJson))
          }
        }~
          (path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & put) { (userId, postId, commentId) => entity(as[Comment]) { comment =>
            complete (CommentsDao.update(comment, commentId).map(_.toJson))
          }
        }~
          (path("comments"/IntNumber) & delete) { commentId =>
            complete (CommentsDao.delete(commentId).map(_.toJson))
        }
    }
    
    trait PostsApi extends JsonMappings{
      val postsApi =
        (path("users"/IntNumber/"posts") & get){ userId =>
          complete (PostsDao.findUserPosts(userId).map(_.toJson))
        }~
        (path("users"/IntNumber/"posts"/IntNumber) & get) { (userId,postId) =>
          complete (PostsDao.findByUserIdAndId(userId, postId).map(_.toJson))
        }~
        (path("users"/IntNumber/"posts") & post) { userId => entity(as[Post]) { post =>
          complete (PostsDao.create(post).map(_.toJson))
        }}~
        (path("users"/IntNumber/"posts"/IntNumber) & put) { (userId, id) => entity(as[Post]) { post =>
          complete (PostsDao.update(post, id).map(_.toJson))
        }}~
        (path("users"/IntNumber/"posts"/IntNumber) & delete) { (userId, postId) =>
          complete (PostsDao.delete(postId).map(_.toJson))
        }
    }
    
      val routes =
        pathPrefix("v1") {
          usersApi ~
          postsApi ~
          commentsApi
        } ~ path("")(getFromResource("public/index.html"))

     

     

     

     

     

     

  • 相关阅读:
    zookeeper安装
    Xmemcached的FAQ和性能调整建议
    memcache命令
    xmemcached的使用
    从硬币游戏学习敏捷开发
    memcached安装
    系统负载能力浅析
    kubernetes1.18安装metrics-server服务
    启动docker run 报错:iptables No chain/target/match by that name
    docker容器内apt更换国内阿里源
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7722924.html
Copyright © 2011-2022 走看看