zoukankan      html  css  js  c++  java
  • search(3)- elastic4s-QueryDSL

      elastic4s是elasticsearch一个第三方开发的scala语言终端工具库(Elastic4s is a concise, idiomatic, reactive, type safe Scala client for Elasticsearch.)。scala用户可以用elastic4s提供的DSL用编程代码形式来构建ES服务请求。与字符型json文本直接编写请求不同的是:在编译DSL编写的ES服务请求时可以发现无论是语法上或者语意上的错误。一般来讲:elastic4s的程序流程相对直接、简单,如下:

      client.execute {
        indexInto("books" ).fields("title" -> "重庆火锅的十种吃法", "content" -> "在这部书里描述了火锅的各种烹饪方式")
      }.await
    
      val response = client.execute {
        search("books").matchQuery("title", "火锅")
      }.await
    
    ...
    
    ...

    一项ES操作服务从构建请求到具体运行都是在execute(T)这个函数里进行的。值得注意的是这个T类型在上面的例子里可以是IndexRequest或者SearchRequest,如下:

       def indexInto(index: Index): IndexRequest
    ...
       def search(index: String): SearchRequest

    实际上execute(T)的T代表elastic4s支持的所有ES操作类型。这种方法实现有赖于scala的typeclass模式。我们先看看execute函数定义:

      // Executes the given request type T, and returns an effect of Response[U]
      // where U is particular to the request type.
      // For example a search request will return a Response[SearchResponse].
      def execute[T, U, F[_]](t: T)(implicit
                                    executor: Executor[F],
                                    functor: Functor[F],
                                    handler: Handler[T, U],
                                    manifest: Manifest[U]): F[Response[U]] = {
        val request = handler.build(t)
        val f = executor.exec(client, request)
        functor.map(f) { resp =>
          handler.responseHandler.handle(resp) match {
            case Right(u) => RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u)
            case Left(error) => RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error)
          }
        }
      }

    这个函数比较重要的功能之一应该是构建服务请求了。这个功能是通过handler.build(t)实现的。handler: Handler[T,U]是个隐式参数,它就是一个typeclass: 

    /**
      * A [[Handler]] is a typeclass used by the [[ElasticClient]] in order to
      * create [[ElasticRequest]] instances which are sent to the elasticsearch
      * server, as well as returning a [[ResponseHandler]] which handles the
      * response from the server.
      *
      * @tparam T the type of the request object handled by this handler
      * @tparam U the type of the response object returned by this handler
      */
    abstract class Handler[T, U: Manifest] extends Logging {
      def responseHandler: ResponseHandler[U] = ResponseHandler.default[U]
      def build(t: T): ElasticRequest
    }

    这个抽象类中有两个函数,其中一个就是build(t: T):ElasticRequest,也是个抽象方法,必须在构建实例时实现。在execute(T)中handler是一个隐式参数,也就是说如果在调用这个函数的可视域内能发现Handler[T,U]实例,则可获取handler,然后可调用handler.build(t)来构建请求。这个T类型是即是调用execute(T)这个T类型了,上面说过T可以是ES的任何操作类型,也就是说如果这些操作类型都继承了Handler[T,U],那么必须按照要求实现build(t:T)来构建该操作类型所需的服务请求ElasticRequest。下面就是例子里两个操作类型需要的隐式实例:

     implicit object IndexHandler extends Handler[IndexRequest, IndexResponse] {
    
        override def responseHandler: ResponseHandler[IndexResponse] = new ResponseHandler[IndexResponse] {
          override def handle(response: HttpResponse): Either[ElasticError, IndexResponse] = response.statusCode match {
            case 201 | 200                   => Right(ResponseHandler.fromResponse[IndexResponse](response))
            case 400 | 401 | 403 | 409 | 500 => Left(ElasticError.parse(response))
            case _                           => sys.error(response.toString)
          }
        }
    
        override def build(request: IndexRequest): ElasticRequest = {
    
          val (method, endpoint) = request.id match {
            case Some(id) =>
              "PUT" -> s"/${URLEncoder.encode(request.index.name, StandardCharsets.UTF_8.name())}/_doc/${URLEncoder.encode(id.toString, StandardCharsets.UTF_8.name())}"
            case None =>
              "POST" -> s"/${URLEncoder.encode(request.index.name, StandardCharsets.UTF_8.name())}/_doc"
          }
    
          val params = scala.collection.mutable.Map.empty[String, String]
          request.createOnly.foreach(
            createOnly =>
              if (createOnly)
                params.put("op_type", "create")
          )
          request.routing.foreach(params.put("routing", _))
          request.parent.foreach(params.put("parent", _))
          request.timeout.foreach(params.put("timeout", _))
          request.pipeline.foreach(params.put("pipeline", _))
          request.refresh.map(RefreshPolicyHttpValue.apply).foreach(params.put("refresh", _))
          request.version.map(_.toString).foreach(params.put("version", _))
          request.ifPrimaryTerm.map(_.toString).foreach(params.put("if_primary_term", _))
          request.ifSeqNo.map(_.toString).foreach(params.put("if_seq_no", _))
          request.versionType.map(VersionTypeHttpString.apply).foreach(params.put("version_type", _))
    
          val body   = IndexContentBuilder(request)
          val entity = ByteArrayEntity(body.getBytes, Some("application/json"))
    
          logger.debug(s"Endpoint=$endpoint")
          ElasticRequest(method, endpoint, params.toMap, entity)
        }
      }
    
    
    ...
    
      implicit object SearchHandler extends Handler[SearchRequest, SearchResponse] {
    
        override def build(request: SearchRequest): ElasticRequest = {
    
          val endpoint =
            if (request.indexes.values.isEmpty)
              "/_all/_search"
            else
              "/" + request.indexes.values
                .map(URLEncoder.encode(_, "UTF-8"))
                .mkString(",") + "/_search"
    
          val params = scala.collection.mutable.Map.empty[String, String]
          request.requestCache.map(_.toString).foreach(params.put("request_cache", _))
          request.searchType
            .filter(_ != SearchType.DEFAULT)
            .map(SearchTypeHttpParameters.convert)
            .foreach(params.put("search_type", _))
          request.routing.map(_.toString).foreach(params.put("routing", _))
          request.pref.foreach(params.put("preference", _))
          request.keepAlive.foreach(params.put("scroll", _))
          request.allowPartialSearchResults.map(_.toString).foreach(params.put("allow_partial_search_results", _))
          request.batchedReduceSize.map(_.toString).foreach(params.put("batched_reduce_size", _))
    
          request.indicesOptions.foreach { opts =>
            IndicesOptionsParams(opts).foreach { case (key, value) => params.put(key, value) }
          }
    
          request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _))
    
          val body = request.source.getOrElse(SearchBodyBuilderFn(request).string())
          ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, "application/json"))
        }
      }

    以上IndexHandler, SearchHandler就是针对index,search操作的Handler[T,U]隐式实例。它们的build(t:T)函数分别按传入的T类型参数构建了各自要求格式的服务请求。

    我总是觉着:不一定所有类型的服务请求都适合用DSL来构建,比如多层逻辑条件的json,可能不容易用DSL来实现(我个人的顾虑)。那么应该有个接口直接json文本嵌入request-entity。elastic4s在各种操作类型的服务请求类型如IndexRequest, SearchRequest,BulkRequest等提供了source:Option[String]字段接收json文本,如下:

    case class IndexRequest(index: Index,
    ...
                            source: Option[String] = None)
        extends BulkCompatibleRequest {
          ...
          def source(json: String): IndexRequest = copy(source = json.some)
    
          ...
        }
    
    case class SearchRequest(indexes: Indexes,
                             ...
                             source: Option[String] = None,
                             ...
                             typedKeys: Option[Boolean] = None) {
                             ...
       /**
        * Sets the source of the request as a json string. Note, if you use this method
        * any other body-level settings will be ignored.
        *
        * HTTP query-parameter settings can still be used, eg limit, routing, search type etc.
        *
        * Unlike rawQuery, source is parsed at the "root" level
        * Query must be valid json beginning with '{' and ending with '}'.
        * Field names must be double quoted.
        *
        * NOTE: This method only works with the HTTP client.
        *
        * Example:
        * {{{
        * search in "*" limit 5 source {
        * """{ "query": { "prefix": { "bands": { "prefix": "coldplay", "boost": 5.0, "rewrite": "yes" } } } }"""
        * } searchType SearchType.Scan
        * }}}
        */
      def source(json: String): SearchRequest = copy(source = json.some)
                           
                             ...
    
                             }

    现在,我们可以直接用json文本了:

      val json =
        """
          |{
          |  "query" : {
          |    "match" : {"title" : "火锅"}
          |  }
          |}
          |""".stripMargin
      val response = client.execute {
        search("books").source(json)   //      .matchQuery("title", "火锅")
      }.await
  • 相关阅读:
    Asp.net 程序连接orcle如果在安装 32 位 Oracle 客户端组件的情况下以 64 位模式运行,
    Navicat 远程连接 Oracle11g 数据库报错 No listener 的问题
    springMVC中@DateTimeFormat 失效的处理
    单例设计模式中懒汉式线程安全的处理
    ajax同步请求
    vue.js在标签属性中拼接字符串
    vue.js进行遍历
    html页面之间的传值
    URL编码和解码的一个小问题(JS方法和JAVA方法)
    SolrJ的配置及使用
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/12547909.html
Copyright © 2011-2022 走看看