zoukankan      html  css  js  c++  java
  • spark对elasticsearch增删查改

    新建一个 dataframe ,插入到索引 _index/_type ,直接调用 saveToEs ,让 _id 为自己设定的 id

    import org.elasticsearch.spark.sql._
    def main(args: Array[String]): Unit = {
    
    val spark = getSparkSession()
    val dataFrame = spark.createDataFrame(Seq(
      (1, 1, "2", "5"),
      (2, 2, "3", "6"),
      (3, 2, "36", "69")
    )).toDF("id", "label", "col1", "col2")
    dataFrame.saveToEs("_index/_type",Map("es.mapping.id" -> "id"))
    }
    
    //配置spark
    def getSparkSession(): SparkSession = {
    val masterUrl = "local"
    val appName = "ttyb"
    val sparkconf = new SparkConf()
      .setMaster(masterUrl)
      .setAppName(appName)
      .set("es.nodes", "es的IP")
      .set("es.port", "9200")
    val Spark = SparkSession.builder().config(sparkconf).getOrCreate()
    Spark
    }
    

    目前 spark 没有开放删除的 API ,所以删除只能用命令行:

    curl -XDELETE 'http://es的IP:9200/_index/_type/_id'
    

    根据时间范围查询,其中 query 可以为空,代表不以任何查询条件查询:

    val startTime = "1519660800000"
    val endTime = "1519747200000"
    val query = "{"query":{"range":{"recordtime":{"gte":" + startTime + ","lte":" + endTime + "}}}}"
    val tableName = "_index/_type"
    val botResultData = spark.esDF(tableName, query)
    

    例如需要将 id=3col1 改成 4col2 改成 7,可以新建一个 dataframe ,按照 id 储存,这样 elasticsearch 就会自动覆盖相同 id 下的数据:

    val dataFrame1 = spark.createDataFrame(Seq(
      (3, 2, "4", "7")
    )).toDF("id", "label", "col1", "col2")
    dataFrame1.saveToEs("_index/_type",Map("es.mapping.id" -> "id"))
    
  • 相关阅读:
    delphi AlphaControls
    MATLAB 中NORM运用
    matlab画图形函数 semilogx
    fir2(n,f,m)
    离散系统频响特性函数freqz()
    snr ber Eb/N0之间的区别与联系
    MATLAB中白噪声的WGN和AWGN函数的使用
    matlab 功率谱分析
    用matlab实现同一个序列重复N倍
    Stem函数绘图
  • 原文地址:https://www.cnblogs.com/TTyb/p/8507231.html
Copyright © 2011-2022 走看看