zoukankan      html  css  js  c++  java
  • Akka(37): Http:客户端操作模式

       Akka-http的客户端连接模式除Connection-Level和Host-Level之外还有一种非常便利的模式:Request-Level-Api。这种模式免除了连接Connection的概念,任何时候可以直接调用singleRequest来与服务端沟通。下面我们用几个例子来示范singleRequest的用法:

      (for {
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))
        message <- Unmarshal(response.entity).to[String]
      } yield message).andThen {
        case Success(msg) => println(s"Received message: $msg")
        case Failure(err) => println(s"Error: ${err.getMessage}")
      }.andThen {case _ => sys.terminate()}

    这是一个GET操作:用Http().singleRequest直接把HttpRequest发送给服务端uri并获取返回的HttpResponse。我们看到,整组函数的返回类型都是Future[?],所以用for-comprehension来把所有实际运算包嵌在Future运算模式内(context)。下面这个例子是客户端上传数据示范:

     (for {
        entity <- Marshal("Wata hell you doing?").to[RequestEntity]
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))
        message <- Unmarshal(response.entity).to[String]
      } yield message).andThen {
        case Success(msg) => println(s"Received message: $msg")
        case Failure(err) => println(s"Error: ${err.getMessage}")
      }.andThen {case _ => sys.terminate()}

    以上是个PUT操作。我们需要先构建数据载体HttpEntity。格式转换函数Marshal也返回Future[HttpEntity],所以也可以包含在for语句内。关注一下这个andThen,它可以连接一串多个monadic运算,在不影响上游运算结果的情况下实现一些副作用计算。值得注意的是上面这两个例子虽然表现形式很简洁,但我们无法对数据转换过程中的异常及response的状态码等进行监控。所以我们应该把整个过程拆分成两部分:先获取response,再具体处理response,包括核对状态,处理数据等:

      case class Item(id: Int, name: String, price: Double)
    
      def getItem(itemId: Int): Future[HttpResponse] = for {
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
      } yield response
    
      def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {
        futResp.andThen {
          case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>
            Unmarshal(entity).to[T]
              .onComplete {
                case Success(t) => println(s"Got response entity: ${t}")
                case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
              }
          case Success(_) => println("Exception in response!")
          case Failure(err) => println(s"Response Failed: ${err.getMessage}")
        }
      }
      extractEntity[Item](getItem(13))

    现在这个extractEntity[Item](getItem(13))可以实现全过程的监控管理了。用同样的模式实现PUT操作:

      def putItem(item: Item): Future[HttpResponse] =
       for {
        reqEntity <- Marshal(item).to[RequestEntity]
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
       } yield response
    
      extractEntity[Item](putItem(Item(23,"Item#23", 46.0)))
         .andThen { case _ => sys.terminate()}

    当然,我们还是使用了前面几篇讨论里的Marshalling方式来进行数据格式的自动转换:

    import de.heikoseeberger.akkahttpjson4s.Json4sSupport
    import org.json4s.jackson
    ...
    trait JsonCodec extends Json4sSupport {
      import org.json4s.DefaultFormats
      import org.json4s.ext.JodaTimeSerializers
      implicit val serilizer = jackson.Serialization
      implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
    }
    object JsConverters extends JsonCodec
    ...
      import JsConverters._
    
      implicit val jsonStreamingSupport = EntityStreamingSupport.json()
        .withParallelMarshalling(parallelism = 8, unordered = false)

    如果我们需要对数据交换过程进行更细致的管控,用Host-Level-Api会更加适合。下面我们就针对Host-Level-Api构建一个客户端的工具库:

    class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                      (implicit sys: ActorSystem, mat: ActorMaterializer) {
    
      import sys.dispatcher
    
      private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
        Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)
    //单一request
      def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {
        Source.single(req -> 1)
          .via(cnnPool)
          .runWith(Sink.head).flatMap {
          case (Success(resp), _) => Future.successful(resp)
          case (Failure(fail), _) => Future.failed(fail)
        }
      }
    //组串request
      def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
        Source(reqs.zipWithIndex.toMap)
          .via(cnnPool)
          .runFold(SortedMap[Int, Future[HttpResponse]]()) {
            case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))
            case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))
          }.flatMap { m => Future.sequence(m.values) }
      }
    }

    下面是一种比较安全的模式:使用了queue来暂存request从而解决因发送方与接收方速率不同所产生的问题:

    class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                              (qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)
                      (implicit sys: ActorSystem, mat: ActorMaterializer) {
      import sys.dispatcher
      private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =
        Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)
    
      val queue =
        Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy)
          .via(cnnPool)
          .to(Sink.foreach({
            case ((Success(resp), p)) => p.success(resp)
            case ((Failure(e), p))    => p.failure(e)
          })).run()
    
      def queueRequest(request: HttpRequest): Future[HttpResponse] = {
          val responsePromise = Promise[HttpResponse]()
          queue.offer(request -> responsePromise).flatMap {
            case QueueOfferResult.Enqueued    => responsePromise.future
            case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
            case QueueOfferResult.Failure(ex) => Future.failed(ex)
            case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
          }
      }
    }

    下面是这些工具函数的具体使用示范:

      val settings = ConnectionPoolSettings(sys)
        .withMaxConnections(8)
        .withMaxOpenRequests(8)
        .withMaxRetries(3)
        .withPipeliningLimit(4)
      val pooledClient = new PooledClient("localhost",8011,settings)
    
      def getItemByPool(itemId: Int): Future[HttpResponse] = for {
        response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
      } yield response
    
      extractEntity[Item](getItemByPool(13))
    
      def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {
        val reqs = itemIds.map { id =>
          HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")
        }
        val rets = (for {
          responses <- pooledClient.orderedResponses(reqs)
        } yield responses)
        rets
      }
      val futResps = getItemsByPool(List(3,5,7))
    
      futResps.andThen {
        case Success(listOfResps) => {
          listOfResps.foreach { r =>
            r match {
              case HttpResponse(StatusCodes.OK, _, entity, _) =>
                Unmarshal(entity).to[Item]
                  .onComplete {
                    case Success(t) => println(s"Got response entity: ${t}")
                    case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
                  }
              case _ => println("Exception in response!")
            }
          }
        }
        case _ => println("Failed to get list of responses!")
      }
    
      val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()
    
    
      def putItemByQueue(item: Item): Future[HttpResponse] =
        for {
          reqEntity <- Marshal(item).to[RequestEntity]
          response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
        } yield response
    
      extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0)))
        .andThen { case _ => sys.terminate()}

    下面是本次讨论的示范源代码:

    服务端代码:

    import akka.actor._
    import akka.stream._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.server.Directives._
    
    import de.heikoseeberger.akkahttpjson4s.Json4sSupport
    import org.json4s.jackson
    trait JsonCodec extends Json4sSupport {
      import org.json4s.DefaultFormats
      import org.json4s.ext.JodaTimeSerializers
      implicit val serilizer = jackson.Serialization
      implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
    }
    object JsConverters extends JsonCodec
    
    
    object TestServer extends App with JsonCodec {
      implicit val httpSys = ActorSystem("httpSystem")
      implicit val httpMat = ActorMaterializer()
      implicit val httpEC = httpSys.dispatcher
    
      import JsConverters._
    
      case class Item(id: Int, name: String, price: Double)
      val messages = path("message") {
        get {
          complete("hello, how are you?")
        } ~
        put {
          entity(as[String]) {msg =>
            complete(msg)
          }
        }
      }
      val items =
        (path("item" / IntNumber) & get) { id =>
           get {
             complete(Item(id, s"item#$id", id * 2.0))
           }
        } ~
          (path("item") & put) {
            entity(as[Item]) {item =>
              complete(item)
            }
         }
    
      val route = messages ~ items
    
      val (host, port) = ("localhost", 8011)
    
      val bindingFuture = Http().bindAndHandle(route,host,port)
    
    
      println(s"Server running at $host $port. Press any key to exit ...")
    
      scala.io.StdIn.readLine()
    
      bindingFuture.flatMap(_.unbind())
        .onComplete(_ => httpSys.terminate())
    
    }

    客户端源代码: 

    import akka.actor._
    import akka.http.scaladsl.settings.ConnectionPoolSettings
    import akka.stream._
    import akka.stream.scaladsl._
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    
    import scala.util._
    import de.heikoseeberger.akkahttpjson4s.Json4sSupport
    import org.json4s.jackson
    
    import scala.concurrent._
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.http.scaladsl.unmarshalling._
    import akka.http.scaladsl.marshalling.Marshal
    
    import scala.collection.SortedMap
    import akka.http.scaladsl.common._
    
    trait JsonCodec extends Json4sSupport {
      import org.json4s.DefaultFormats
      import org.json4s.ext.JodaTimeSerializers
      implicit val serilizer = jackson.Serialization
      implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
    }
    object JsConverters extends JsonCodec
    
    class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                      (implicit sys: ActorSystem, mat: ActorMaterializer) {
    
      import sys.dispatcher
    
      private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
        Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)
    
      def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {
        Source.single(req -> 1)
          .via(cnnPool)
          .runWith(Sink.head).flatMap {
          case (Success(resp), _) => Future.successful(resp)
          case (Failure(fail), _) => Future.failed(fail)
        }
      }
    
      def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
        Source(reqs.zipWithIndex.toMap)
          .via(cnnPool)
          .runFold(SortedMap[Int, Future[HttpResponse]]()) {
            case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))
            case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))
          }.flatMap { m => Future.sequence(m.values) }
      }
    }
    class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                              (qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)
                      (implicit sys: ActorSystem, mat: ActorMaterializer) {
      import sys.dispatcher
      private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =
        Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)
    
      val queue =
        Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy)
          .via(cnnPool)
          .to(Sink.foreach({
            case ((Success(resp), p)) => p.success(resp)
            case ((Failure(e), p))    => p.failure(e)
          })).run()
    
      def queueRequest(request: HttpRequest): Future[HttpResponse] = {
          val responsePromise = Promise[HttpResponse]()
          queue.offer(request -> responsePromise).flatMap {
            case QueueOfferResult.Enqueued    => responsePromise.future
            case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
            case QueueOfferResult.Failure(ex) => Future.failed(ex)
            case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
          }
      }
    }
    object ClientRequesting extends App {
      import JsConverters._
    
      implicit val sys = ActorSystem("sysClient")
      implicit val mat = ActorMaterializer()
      implicit val ec = sys.dispatcher
    
      implicit val jsonStreamingSupport = EntityStreamingSupport.json()
        .withParallelMarshalling(parallelism = 8, unordered = false)
    
      case class Item(id: Int, name: String, price: Double)
    
      def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {
        futResp.andThen {
          case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>
            Unmarshal(entity).to[T]
              .onComplete {
                case Success(t) => println(s"Got response entity: ${t}")
                case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
              }
          case Success(_) => println("Exception in response!")
          case Failure(err) => println(s"Response Failed: ${err.getMessage}")
        }
      }
    
      
      (for {
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))
        message <- Unmarshal(response.entity).to[String]
      } yield message).andThen {
        case Success(msg) => println(s"Received message: $msg")
        case Failure(err) => println(s"Error: ${err.getMessage}")
      }  //.andThen {case _ => sys.terminate()}
    
    
      (for {
        entity <- Marshal("Wata hell you doing?").to[RequestEntity]
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))
        message <- Unmarshal(response.entity).to[String]
      } yield message).andThen {
        case Success(msg) => println(s"Received message: $msg")
        case Failure(err) => println(s"Error: ${err.getMessage}")
      } //.andThen {case _ => sys.terminate()}
    
    
      def getItem(itemId: Int): Future[HttpResponse] = for {
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
      } yield response
    
      extractEntity[Item](getItem(13))
    
      def putItem(item: Item): Future[HttpResponse] =
       for {
        reqEntity <- Marshal(item).to[RequestEntity]
        response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
       } yield response
    
      extractEntity[Item](putItem(Item(23,"Item#23", 46.0)))
         .andThen { case _ => sys.terminate()}
      
      val settings = ConnectionPoolSettings(sys)
        .withMaxConnections(8)
        .withMaxOpenRequests(8)
        .withMaxRetries(3)
        .withPipeliningLimit(4)
      val pooledClient = new PooledClient("localhost",8011,settings)
    
      def getItemByPool(itemId: Int): Future[HttpResponse] = for {
        response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
      } yield response
    
      extractEntity[Item](getItemByPool(13))
    
      def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {
        val reqs = itemIds.map { id =>
          HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")
        }
        val rets = (for {
          responses <- pooledClient.orderedResponses(reqs)
        } yield responses)
        rets
      }
      val futResps = getItemsByPool(List(3,5,7))
    
      futResps.andThen {
        case Success(listOfResps) => {
          listOfResps.foreach { r =>
            r match {
              case HttpResponse(StatusCodes.OK, _, entity, _) =>
                Unmarshal(entity).to[Item]
                  .onComplete {
                    case Success(t) => println(s"Got response entity: ${t}")
                    case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
                  }
              case _ => println("Exception in response!")
            }
          }
        }
        case _ => println("Failed to get list of responses!")
      }
    
      val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()
      
      def putItemByQueue(item: Item): Future[HttpResponse] =
        for {
          reqEntity <- Marshal(item).to[RequestEntity]
          response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
        } yield response
    
      extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0)))
        .andThen { case _ => sys.terminate()}
      
    
    }

     

     

     

     

     

  • 相关阅读:
    50个查询系列-第13个查询:把“SC”表中“叶平”老师教的课的成绩都更改为此课程的平均成绩;
    50个查询系列-第12个查询:查询至少学过学号为“1001”同学所有课程的其他同学学号和姓名
    033医疗项目-模块三:药品供应商目录模块——供货商药品目录t添加查询功能----------Dao层和Service层和Action层和调试
    032医疗项目-模块三:药品供应商目录模块——供货商药品目录查询功能----------Service层和Action层和调试
    031医疗项目-模块三:药品供应商目录模块——供货商药品目录查询功能----------sql补充知识
    50个查询系列-第11个查询:查询至少有一门课与学号为“1001”的同学所学相同的同学的学号和姓名;
    030医疗项目-模块三:药品供应商目录模块——供货商药品目录查询功能----------Dao层:基本的查询语句的编写
    Java 代码性能调优“三十六”策
    JVM运行原理及Stack和Heap的实现过程
    细数那些不懂Spring底层原理带来的伤与痛
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7878579.html
Copyright © 2011-2022 走看看