zoukankan      html  css  js  c++  java
  • search(6)- elastic4s-CRUD

        如果我们把ES作为某种数据库来使用的话,必须熟练掌握ES的CRUD操作。在这之前先更正一下上篇中关于检查索引是否存在的方法:elastic4s的具体调用如下:

      //删除索引
      val rspExists = client.execute(indexExists("company")).await
      if (rspExists.result.exists)
         client.execute(deleteIndex("company")).await

    在下面我们还会示范如何检查一条记录(document)是否存在的方法。

    先示范新建一条记录。一般来讲数据库表都有个唯一字段,最好用ES里的id来代表,否则ES会自动产生一个唯一id,那么随机读取get时就会很不方便。如果新插入的记录id已经在表里存在,ES会替换新的内容,不会产生异常。可以在elastic4s里使用createOnly(true)来强制产生重复id异常:

    import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
    import com.sksamuel.elastic4s.akka._
    import akka.actor._
    
    import scala.concurrent.ExecutionContext.Implicits.global
    object Lesson05 extends App {
      import com.sksamuel.elastic4s.ElasticDsl._
    
      private implicit lazy val system: ActorSystem = ActorSystem()
      val akkaClient = AkkaHttpClient(AkkaHttpClientSettings(List("130.1.1.234:9200")))
      val client = ElasticClient(akkaClient)
    
      val publisher = indexInto("company").id("c00001")
        .fields(
          "code" -> "c00001",
          "name" -> "人民出版社",
          "biztype" -> "出版社",
          "addr" -> Map(
            "district" -> "北京市东城区",
            "address" -> "朝阳门内大街166号"
          ),
          "regdate" -> "1963-02-18",
          "contact" -> "65122634@163.com"
        ).createOnly(true)
    
      val pubExists = client.execute(exists("company","c00001")).await
      if (pubExists.isSuccess) {
        val createPub = client.execute(publisher).await
    
        if (createPub.isSuccess) {
          val pub = client.execute(get("company", "c00001").fetchSourceContext(true)).await
          println(s"${pub.result.sourceAsMap}")
        } else println(s"${createPub.error.reason}")
      } else println(s"${pubExists.error.reason}")
    
      val dstributor = indexInto("company").id("c00002")
        .fields(
          "code" -> "c00002",
          "name" -> "新华文轩出版传媒股份有限公司",
          "biztype" -> "出版发行",
          "addr" -> Map(
            "district" -> "四川省成都市锦江区",
            "address" -> "金石路239号4栋1层1号"
          ),
          "regdate" -> "2005-03-09",
          "contact" -> "52635286@qq.com"
        ).createOnly(true)
    
      val grpExists = client.execute(exists("company","c00002")).await
      if (grpExists.isSuccess) {
        val createGroup = client.execute(dstributor).await
    
        if (createGroup.isSuccess) {
          val dstr = client.execute(get("company", "c00002").fetchSourceContext(true)).await
          println(s"${dstr.result.sourceAsMap}")
        } else println(s"${createGroup.error.reason}")
      } else println(s"${grpExists.error.reason}")
    
      val mget = client.execute(multiget(
        get("company","c00001"),
        get("company","c00002")
      )).await
      if(mget.isSuccess)
        mget.result.items.foreach(i => println(s"${i.sourceAsMap}"))
      else println(s"${mget.error.reason}")
    
      scala.io.StdIn.readLine()
      system.terminate()
      client.close()
    }

    上面示范了不同类型字段的填写方式,特别是nested字段如addr。每插入一条新记录就用get进行一次验证,输出显示:

    HashMap(name -> 人民出版社, regdate -> 1963-02-18, contact -> 65122634@163.com, code -> c00001, addr -> Map(district -> 北京市东城区, address -> 朝阳门内大街166号), biztype -> 出版社)
    HashMap(name -> 新华文轩出版传媒股份有限公司, regdate -> 2005-03-09, contact -> 52635286@qq.com, code -> c00002, addr -> Map(district -> 四川省成都市锦江区, address -> 金石路239号4栋1层1号), biztype -> 出版发行)
    HashMap(name -> 人民出版社, regdate -> 1963-02-18, contact -> 65122634@163.com, code -> c00001, addr -> Map(district -> 北京市东城区, address -> 朝阳门内大街166号), biztype -> 出版社)
    HashMap(name -> 新华文轩出版传媒股份有限公司, regdate -> 2005-03-09, contact -> 52635286@qq.com, code -> c00002, addr -> Map(district -> 四川省成都市锦江区, address -> 金石路239号4栋1层1号), biztype -> 出版发行)

    上面提到过,如果我们想把ES当作普通的数据库来使用的话,还是要备齐了CRUD功能。具体操作按照关系数据库方式围绕着唯一键id进行。比如,我们可以用id来检查记录是否已经存在:

      val pubExists = client.execute(exists("company","c00001")).await
      if (pubExists.isSuccess) {...}

    上面我们示范了针对索引的create,read操作。下面讨论一下update:update 可分单笔或批次两类,分别为:updateById, updateByQuery,很明显:updateByQuery是以query作为目标筛选条件的成批update操作。与上面的create操作一样,我们还是需要考虑唯一键id,这个可以在updateById操作里处理:当目标id存在时,用update请求里的字段值更新对应的字段。如目标id不存在的话就把update请求里的字段值当作新记录内容插入:

    import com.sksamuel.elastic4s.http.JavaClient
    import com.sksamuel.elastic4s.requests.common.RefreshPolicy
    import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
    import scala.concurrent.ExecutionContext.Implicits.global
    object Lesson06 extends App {
      import com.sksamuel.elastic4s.ElasticDsl._
    
      val esjava = JavaClient(ElasticProperties("http://localhost:9200"))
      val client = ElasticClient(esjava)
    
      val doc1 = updateById("company","t00001")
          .docAsUpsert(
            Map(
              "code" -> "t00001",
              "name" -> "test company1"
            )
          )
    
      val doc2 = updateById("company","t00002")
        .docAsUpsert(
          Map(
            "code" -> "t00002",
            "name" -> "test company2"
          )
        )
    
      val doc3 = updateById("company","t00003")
        .docAsUpsert(
          Map(
            "code" -> "t00003",
            "name" -> "test company3"
          )
        )
    
      val updateAll = for {
        _ <- client.execute(doc1)
        _ <- client.execute(doc2)
        _ <- client.execute(doc3)
      } yield()
      updateAll.await
    
      val getResults = client.execute(multiget(
         get("company","t00001").fetchSourceInclude("code","name"),
         get("company","t00002").fetchSourceInclude("code","name"),
         get("company","t00003").fetchSourceInclude("code","name")
       )
      ).await
    
      getResults.result.items.foreach(i => println(i.sourceAsMap))
    
      client.close()
    }

    成批更新比较麻烦,因为通常每条记录的更新都可能涉及到当前记录的字段值,或作为判断条件,或为更新值,我们需要使用并处理当前记录中某些字段。这就需要在数据层面运行某些计算方法,可以用脚本语言来实现这样的功能,如下:

      import com.sksamuel.elastic4s.requests.script.Script
      val script = "ctx._source.fullname = ctx._source.code+' '+ctx._source.name"
      val updateByQ = updateIn("company")
        .query(matchQuery("name","test"))
        .script(Script(script,Some("painless")))
    
      val qupResult = client.execute(updateByQ).await
    
      val getResults = client.execute(multiget(
         get("company","t00001"),
         get("company","t00002"),
         get("company","t00003")
       )
      ).await
    
      getResults.result.items.foreach(i => println(i.sourceAsMap))

    与update一样,delete也分单个或成批删除模式。delete by Id 示例如下:

      (for {
        _ <- client.execute(delete("t00001").from("company"))
        _ <- client.execute(deleteByQuery("company", "t00002"))
      } yield()).await

    delete by Query 用法如下:

      import com.sksamuel.elastic4s.Index._
      client.execute(
        deleteByQuery(toIndex("company"),
          termQuery("code","t00003"))
      ).await
  • 相关阅读:
    HDU1285-确定比赛名次(拓扑排序)
    ftp sftp
    Python with 用法
    odoo 非root用户运行不成功
    linux 删除软连接
    vscode wsl php
    WSL 修改默认登录用户为root
    WSL ssh服务自启动
    odoo 获取model的所有字段
    odoo 在"动作"("Action")菜单中添加子菜单, 点击子菜单弹窗自定义form
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/12747840.html
Copyright © 2011-2022 走看看