zoukankan      html  css  js  c++  java
  • [转] Akka实战:构建REST风格的微服务

    [From] http://www.yangbajing.me/2015/11/27/akka%E5%AE%9E%E6%88%98%EF%BC%9A%E6%9E%84%E5%BB%BArest%E9%A3%8E%E6%A0%BC%E7%9A%84%E5%BE%AE%E6%9C%8D%E5%8A%A1/

    Akka实战:构建REST风格的微服务

    2015-11-27

    使用Akka-Http构建REST风格的微服务,服务API应尽量遵循REST语义,数据使用JSON格式交互。在有错误发生时应返回:{"errcode":409,"errmsg":"aa is invalid,the ID is expected to be bb"} 类似的JSON错误消息。

    代码:

    https://github.com/yangbajing/akka-action

    http://git.oschina.net/yangbajing/akka-action

    代码

    首先来看看代码文件结构:

      

    ├── ApiRoute.scala
    
    ├── App.scala
    
    ├── ContextProps.scala
    
    ├── book
    
    │   ├── Book.scala
    
    │   ├── BookContextProps.scala
    
    │   ├── BookRoute.scala
    
    │   └── BookService.scala
    
    └── news
    
        ├── News.scala
    
        ├── NewsContextProps.scala
    
        ├── NewsRoute.scala
    
        └── NewsService.scala

    通过名字可以看出,App.scala是启动程序,以Route结尾的是API路由定义文件,Service结尾的就是服务实现代码了。ContextProps结尾的是服务与路由交互的上下文属性部分,Service的将会在ContextProps中实例化并传给各个Route。

    从目录结构上看,程序是按功能模块进行划分的。book相关的路由、服务、实体都定义在book包下。相应的,与news相关的代码则写于news包。

    首先来看看程序的启动文件,App.scala

      

    def main(args: Array[String]): Unit = {
    
      Files.write(Paths.get("app.pid"), Utils.getPid.getBytes(Utils.CHARSET))
    
      val contextProps = new ContextProps
    
      val bindingFuture = Http().bindAndHandle(ApiRoute(contextProps), "0.0.0.0", 3333)
    
      bindingFuture.onComplete {
    
        case Success(binding) =>
    
          logger.info(binding.toString)
    
        case Failure(e) =>
    
          logger.error(e.getLocalizedMessage, e)
    
      }
    
    }

    定义akka-http绑定的host和port,设置ContextProps,并把它传给ApiRoute。App.scala的代码还是很简单的,接下来看看ApiRoute的实现。

    // 定义一个Health Check API,用户第3方工具(如:Nginx/Tengine)验证服务是否正常运行
    
    val healthCheck =
    
      path("health_check") {
    
        get { ctx =>
    
          logger.debug(ctx.request.toString)
    
          ctx.complete(HttpEntity.Empty)
    
        }
    
      }
    
    import me.yangbajing.akkaaction.util.JsonSupport._
    
    val customExceptionHandler = ExceptionHandler {
    
      case e: MessageException =>
    
        extractRequest { req =>
    
          val msg =
    
            s"""
    method: ${req.method}
    
               |uri: ${req.uri}
    
               |headers:
    
               |	${req.headers.mkString("
    	")}
    
               |$e""".stripMargin
    
          if (e.errcode > 500) logger.error(msg, e)
    
          else logger.warn(msg)
    
          complete(
    
            StatusCodes.getForKey(e.errcode) getOrElse StatusCodes.InternalServerError,
    
            JObject("errcode" -> JInt(e.errcode), "errmsg" -> JString(e.errmsg)))
    
        }
    
      case e: Exception =>
    
        extractRequest { req =>
    
          logger.error(req.toString, e)
    
          complete(
    
            StatusCodes.InternalServerError,
    
            JObject("errcode" -> JInt(500), "errmsg" -> JString(e.getLocalizedMessage)))
    
        }
    
    }
    
    def apply(props: ContextProps)(implicit ec: ExecutionContextExecutor, mat: Materializer) =
    
      handleExceptions(customExceptionHandler) {
    
        pathPrefix("api") {
    
          healthCheck ~
    
            NewsRoute(props) ~
    
            BookRoute(props)
    
        }
    
      }

    代码有一点长,现在分别解说。

    customExceptionHandler

    自定义的异常处理器,主要用于把自定义异常和系统异常转换成JSON消息输出,并设置相对应的HTTP状态码。

    apply

    apply方法定义了实现的API路由,由代码可以看到news、book两个模块的路由分别由NewsRoute和BookRoute两个文件定义。把相同功能的路由、服务、实体定义在同一个逻辑上下文(包)中,个人认为是一种更好的微服务实践。

    book模块详解

    book
    
    ├── Book.scala
    
    ├── BookContextProps.scala
    
    ├── BookRoute.scala
    
    └── BookService.scala

    Book:实体

    BookContextProps:上下文属性,服务将在此实例化。并把接口混入ContextProps中。

    BookRoute:API路由定义

    BookService:服务功能实现

    BookRotue定义

    pathPrefix("book") {
    
      pathEnd {
    
        post {
    
          entity(as[Book]) { book =>
    
            onSuccess(props.bookService.persist(book)) { result =>
    
              complete(StatusCodes.Created, result)
    
            }
    
          }
    
        }
    
      } ~
    
        path(Segment) { bookId =>
    
          get {
    
            complete(props.bookService.findOneById(bookId))
    
          } ~
    
            put {
    
              entity(as[Book]) { book =>
    
                complete(props.bookService.updateById(bookId, book))
    
              }
    
            } ~
    
            delete {
    
              complete(props.bookService.deleteById(bookId).map(id => Map("id" -> id)))
    
            }
    
        }

    Akka-Http提供了高级routing DSL,可以很自然的定义出树型结构的RESTful风格的API。由代码可见,定义了4个API。分别对应insert、select、update、delete操作,由post、get、put和delete4个指令实现对应操作的HTTP方法。

    pathPrefix、pathEnd和path3个路径定义指令的区别在于pathPrefix代表由它定义的路径还并未终结,在它下面还有子路径。而path则代表它已经是最终的路径了,pathEnd是用于在使用了pathPrefix的情况下也可以直接访问由pathPrefix指定的路径。

    Segment用于把由path定义的路径抽取成一个参数(bookId)。除了Segment用于抽取一个字符串类型,还有IntNumber和LongNumber用于抽取路径为Int或Long类型。

    entity指令用于抽取HTTP请求的body部分,并通过as[T]方法将其自动解析为指定类型。这里使用到了akka提供的Unmarshaller特性。这里通过JsonSupport里定义的json4sUnmarshaller将用户请求提交的JSON字符串映射到Book类型。

    implicit def json4sUnmarshaller[A: Manifest](implicit mat: Materializer): FromEntityUnmarshaller[A] =
    
      Unmarshaller.byteStringUnmarshaller
    
        .forContentTypes(MediaTypes.`application/json`)
    
        .mapWithCharset { (data, charset) =>
    
          val input = if (charset == HttpCharsets.`UTF-8`) data.utf8String else data.decodeString(charset.nioCharset().name)
    
          jsonSerialization.read(input)
    
        }
    
    implicit def json4sMarshaller[A <: AnyRef]: ToEntityMarshaller[A] =
    
      Marshaller.StringMarshaller.wrap(ContentType(MediaTypes.`application/json`, HttpCharsets.`UTF-8`))(v =>
    
        jsonSerialization.write[A](v))

    自然json4sMarshaller则是把T类型的对象映射为JSON字符串响应给请求方。

    BookService

    再来看看BookService服务的实现。

    def updateById(bookId: String, book: Book)(implicit ec: ExecutionContext): Future[Book] = Future {
    
       if (bookId != book.id)
    
         throw MeConflictMessage(s"${book.id} is invalid,the ID is expected to be $bookId")
    
       val newBooks = BookService.books.filterNot(_.id == bookId)
    
       if (newBooks.size == BookService.books.size)
    
         throw MeNotFoundMessage(s"$bookId not found")
    
       BookService.books ::= book
    
       book
    
     }
    
     def persist(book: Book)(implicit ec: ExecutionContext): Future[Book] = Future {
    
       if (BookService.books.exists(_.id == book.id))
    
         throw MeConflictMessage(s"${book.id} exsits")
    
       BookService.books ::= book
    
       book
    
     }
    
     def deleteById(bookId: String)(implicit ec: ExecutionContext): Future[String] = Future {
    
       val newBooks = BookService.books.filterNot(_.id == bookId)
    
       if (newBooks.size == BookService.books.size)
    
         throw MeNotFoundMessage(s"$bookId not found")
    
       BookService.books = newBooks
    
       bookId
    
     }
    
     def findOneById(bookId: String)(implicit ec: ExecutionContext): Future[Book] = Future {
    
       BookService.books.find(_.id == bookId).getOrElse(throw MeNotFoundMessage(s"$bookId not found"))
    
     }

    看到每个方法的返回值都被定义成了Future[T],akka-http是一个基于akka-actor和akka-stream的异步HTTP工具集,使用Future可以提供整个系统的响应。我们这里直接使用Future来模拟异步访问(数据库操作)。

    在每个方法中,我们校验参数是否有效。若校验失败则直接抛出自定义异常。Future函数将捕获异常,由之前定义的customExceptionHandler自定义异常处理器来将自定义异常转换成JSON消息发送给调用方,并设置匹配的HTTP状态码。

    测试

    百闻不如一试,下载代码实际操作下(下载地址在文章开头)。

    运行程序:

     ./sbt

    akka-action > runMain me.yangbajing.akkaaction.restapi.App

    依次执行docs/scripts/restapi目录下的测试脚本,查看各请求下REST API的返回值(需要系统安装curl)。

    ./get-book-aa.sh:正常返回ID为aa的书

    ./get-book-bb.sh:查找ID为bb的书返回404

    ./post-book.sh:创建一本ID为bb书,返回201

    ./get-book-bb.sh:正确返回ID为bb的书

    ./put-book.hs:正确更新ID为bb的书

    ./put-book-invalid.sh:无效的更新ID为aa的书,返回409

    ./delete-book-aa.sh:成功的删除ID为aa的书

    ./get-book-aa.sh:再次查找ID为aa的书返回404

    ./delete-book-aa.sh:再次删除ID为aa的书时返回404

    总结

    akka-http是一个很有意思的HTTP工具库,它完整的实现了客户端和服务端编程工具,还支持WebScoket。基于akka-actor和akka-stream,提供了高并发的异步编程模型。我们可以很快捷的实现出一个响应式(Reactive)Web Service。其提供的routing DSL可方便的定义出一套树型结构的API,很自然的匹配到RESTful风格的API设计。

  • 相关阅读:
    docker常用软件安装及使用
    生成base64位图片验证码
    项目经理的特殊需求,对象的移动,
    集成微信支付的代码。兼容小程序,扫码,app,公众号。h5 支付 ,以及 服务商提现
    nginx请求转发配置
    nginx的conf文件,两种配置方式,第一种无ssl证书,第二种有ssl证书。
    Springboot集成WebSocket通信全部代码,即扣即用。
    Centos7上安装docker
    mysql 5.7 的 /etc/my.cnf
    【图嵌入】DeepWalk 和 Node2Vec
  • 原文地址:https://www.cnblogs.com/pekkle/p/7922239.html
Copyright © 2011-2022 走看看