zoukankan      html  css  js  c++  java
  • Akka(42): Http:身份验证

       当我们把Akka-http作为数据库数据交换工具时,数据是以Source[ROW,_]形式存放在Entity里的。很多时候除数据之外我们可能需要进行一些附加的信息传递如对数据的具体处理方式等。我们可以通过Akka-http的raw-header来实现附加自定义消息的传递,这项功能可以通过Akka-http提供的raw-header筛选功能来实现。在客户端我们把附加消息放在HttpRequest的raw header里,如下:

      import akka.http.scaladsl.model.headers._
      val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
                       .addHeader(RawHeader("action","insert:county"))

    在这里客户端注明上传数据应插入county表。服务端可以像下面这样获取这项信息:

                 optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action")
                      }
                    case None => complete ("No action specified!")
                  }

    Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端可以用下面的方法提供自己的用户身份信息:

      import akka.http.scaladsl.model.headers._
      val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
        .addHeader(RawHeader("action","insert:county"))
        .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

    服务端对客户端的身份验证处理方法如下:

      import akka.http.scaladsl.server.directives.Credentials
      def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
        implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
        credentials match {
          case p @ Credentials.Provided(id) =>
            Future {
              // potentially
              if (p.verify("p4ssw0rd")) Some(User(id))
              else None
            }
          case _ => Future.successful(None)
        }
      }
    
      case class User(name: String)
      val validUsers = Set("john","peter","tiger","susan")
      def hasAdminPermissions(user: User): Future[Boolean] = {
        implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
        Future.successful(validUsers.contains(user.name))
      }

    下面是Credential-Directive的使用方法:

             authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
                authorizeAsync(_ => hasPermissions(user)) {
                  withoutSizeLimit {
                    handleExceptions(postExceptionHandler) {
                      optionalHeaderValueByName("action") {
                        case Some(action) =>
                          entity(asSourceOf[County]) { source =>
                            val futofNames: Future[List[String]] =
                              source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                            complete(s"Received rows for $action sent from $user")
                          }
                        case None => complete(s"$user did not specify action for uploaded rows!")
                      }
                    }
                  }
                }
              }

    下面是本次讨论的示范代码:

    客户端:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import scala.util._
    import akka._
    import akka.http.scaladsl.common._
    import spray.json.DefaultJsonProtocol
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    import akka.http.scaladsl.common.EntityStreamingSupport
    import akka.http.scaladsl.model._
    import spray.json._
    
    trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
    object Converters extends MyFormats {
      case class County(id: Int, name: String)
      implicit val countyFormat = jsonFormat2(County)
    }
    
    object HttpClientDemo extends App {
      import Converters._
    
      implicit val sys = ActorSystem("ClientSys")
      implicit val mat = ActorMaterializer()
      implicit val ec = sys.dispatcher
    
      implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
    
      import akka.util.ByteString
      import akka.http.scaladsl.model.HttpEntity.limitableByteSource
    
      val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
      def countyToByteString(c: County) = {
        ByteString(c.toJson.toString)
      }
      val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)
    
      val rowBytes = limitableByteSource(source via flowCountyToByteString)
    
      import akka.http.scaladsl.model.headers._
      val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
        .addHeader(RawHeader("action","insert:county"))
        .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))
    
      val data = HttpEntity(
        ContentTypes.`application/json`,
        rowBytes
      )
    
      def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
        val futResp = Http(sys).singleRequest(
          request.copy(entity = dataEntity)
        )
        futResp
          .andThen {
            case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
              entity.dataBytes.map(_.utf8String).runForeach(println)
            case Success(r@HttpResponse(code, _, _, _)) =>
              println(s"Upload request failed, response code: $code")
              r.discardEntityBytes()
            case Success(_) => println("Unable to Upload file!")
            case Failure(err) => println(s"Upload failed: ${err.getMessage}")
    
          }
      }
    
    
      uploadRows(request,data)
    
      scala.io.StdIn.readLine()
    
      sys.terminate()
    
    }

    服务端:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka._
    import akka.http.scaladsl.common._
    import spray.json.DefaultJsonProtocol
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
    import scala.concurrent._
    import akka.http.scaladsl.server._
    import akka.http.scaladsl.server.Directives._
    import akka.http.scaladsl.model._
    
    trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
    object Converters extends MyFormats {
      case class County(id: Int, name: String)
      val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
      implicit val countyFormat = jsonFormat2(County)
    }
    
    object HttpServerDemo extends App {
    
      import Converters._
    
      implicit val httpSys = ActorSystem("httpSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEC = httpSys.dispatcher
    
    
      implicit val jsonStreamingSupport = EntityStreamingSupport.json()
        .withParallelMarshalling(parallelism = 8, unordered = false)
    
      def postExceptionHandler: ExceptionHandler =
        ExceptionHandler {
          case _: RuntimeException =>
            extractRequest { req =>
              req.discardEntityBytes()
              complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))
            }
        }
    
      import akka.http.scaladsl.server.directives.Credentials
      def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
        implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
        credentials match {
          case p @ Credentials.Provided(id) =>
            Future {
              // potentially
              if (p.verify("p4ssw0rd")) Some(User(id))
              else None
            }
          case _ => Future.successful(None)
        }
      }
    
      case class User(name: String)
      val validUsers = Set("john","peter","tiger","susan")
      def hasPermissions(user: User): Future[Boolean] = {
        implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
        Future.successful(validUsers.contains(user.name))
      }
    
      val route =
        path("rows") {
          get {
            complete {
              source
            }
          } ~
            post {
              authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
                authorizeAsync(_ => hasPermissions(user)) {
                  withoutSizeLimit {
                    handleExceptions(postExceptionHandler) {
                      optionalHeaderValueByName("action") {
                        case Some(action) =>
                          entity(asSourceOf[County]) { source =>
                            val futofNames: Future[List[String]] =
                              source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                            complete(s"Received rows for $action sent from $user")
                          }
                        case None => complete(s"$user did not specify action for uploaded rows!")
                      }
                    }
                  }
                }
              }
            }
        }
    
      val (port, host) = (8011,"localhost")
    
      val bindingFuture = Http().bindAndHandle(route,host,port)
    
      println(s"Server running at $host $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
      bindingFuture.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
    
    }

     

     

     

  • 相关阅读:
    MySQL练习题
    MySql基础操作
    解决使用IDEA启动Tomcat成功但localhost:8080无法访问的问题
    1417. 重新格式化字符串--来源:力扣(LeetCode)
    字符消除
    Comsol中Absolute Pressure的解释
    气体流量与质量流率换算
    FileZilla MLSD错误:连接超时、读取目录列表失败
    Avalon总线的地址对齐与NIOS编程
    同步复位和异步复位--好文章就是要记录下来
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8033715.html
Copyright © 2011-2022 走看看