zoukankan      html  css  js  c++  java
  • elasticsearch-hadoop 扩展定制 官方包以支持 update upsert doc

    官方源码地址 https://github.com/elastic/elasticsearch-hadoop

    相关文档 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

    spark to es

    4种操作方式
    index
    update
    upsert
    create

    只支持四种操作,看文档描述,目前的需求只能用 upsert 实现,但官方的包对 upsert 支持不完整

    upsert 现支持
    /*
    * {
    * "script":{
    * "inline": "...",
    * "lang": "...",
    * "params": ...,
    * },
    * "upsert": {...}
    * }
    */


    /*
    * {
    * "doc_as_upsert": true,
    * "doc": {...}
    * }
    */

    并不支持
    /*
    * {
    * "upsert": {},
    * "doc": {...}
    * }
    */

    无奈只好自已动手了

    代码量太大,找到相关的部分,理顺操作逻辑后,改起来就容易多了,最主要的部分在这里

    source code

    把rdd json 化并拼接http 请求

    两种思路

    1传入完整的对像,然后修改 json 解析部分,拼接出请求
    2修改对象结构,json 解析部分不变,拼接出请求

    1的实现相对复杂,工作量很大,且要求对代码项目很熟悉,实现的成本很高
    2的实现很简单

    修改的地方很少,可以参照着自已改,需重新编译

    具体看 commit,也提交到了官方,但代码比较粗暴,很可能通不过,功能优先,官方不采用,可以用的时候再个人修改。

    之后打包,引用打包后的文件。

    另外,程序也要作并要的修改

    写入部分, 对比下就知道要改的地方,很容易

    case class ES_Upsert(kw_index: String, kw_type: String, id: String, date_idate: String, date_udate: String)
    
    case class ES_Doc(date_udate: String)
    
    case class ES_UpsertDoc(upsert: ES_Upsert, doc: ES_Doc)
    
    .saveToEs(Map[String, String](
    "es.resource" -> "{upsert.kw_index}/{upsert.kw_type}",
    "es.nodes" -> es,
    "es.input.json" -> "false",
    "es.nodes.discovery" -> "false",
    "es.update.doc" -> "true",
    "es.nodes.wan.only" -> "true",
    "es.write.operation" -> "upsert",
    "es.mapping.exclude" -> "upsert.kw_index,upsert.kw_type,upsert.id",
    "es.mapping.id" -> "upsert.id"
    ))

    外套一层对象ES_UpsertDoc 字段名称分别为upsert,doc熟悉es的就不用解释吧
    "es.update.doc" -> "true"为 true 才生效。

    "es.resource" -> "{upsert.kw_index}/{upsert.kw_type}",
    "es.mapping.exclude" -> "upsert.kw_index,upsert.kw_type,upsert.id"
    index type field mapping 也要多套一层


    项目示例

    kafka spark streaming elasticsearch

    https://github.com/cclient/elasticsearch-spark-upsert-from-kafka

    ——官方已经拒掉了,主要原因是这个包要在各种数据平台上保证可用,按官方的说法是

    'whether using Map/Reduce or libraries built upon it such as Hive, Pig or Cascading or new upcoming libraries like Apache Spark'

    现在的case只是基于 Spark的,即使在spark上可用,没有在其他平台的测试,不会通过,也没有精力去挨个试,等用的时候自已改吧

  • 相关阅读:
    网站如何做分布式(集群)的大纲
    [转]Bind和Eval的区别详解
    SQL 中游标的并发问题。
    如何利用客户端缓存对网站进行优化?
    Windows的第五种群集方案 CCS
    ICollection 接口的类序列化的问题。
    如何提高网页的效率(上篇)——提高网页效率的14条准则
    石油地质名称解释
    【SQL基础概念】
    DataView/DataRowView
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/7992304.html
Copyright © 2011-2022 走看看