zoukankan      html  css  js  c++  java
  • kafka spark steam 写入elasticsearch的部分问题

    应用版本

    elasticsearch 5.5

    spark 2.2.0

    hadoop 2.7

    依赖包版本

    docker cp /Users/cclient/.ivy2/cache/org.elasticsearch/elasticsearch-spark-20_2.11/jars/elasticsearch-spark-20_2.11-6.0.0-alpha2.jar spark:/usr/spark-2.2.0/jars/

    问题1

    Multiple ES-Hadoop versions detected in the classpath; please use only one

    多了其他依赖包 我的环境多引入了elasticsearch-hadoop-cascading-6.0.0-alpha2.jar 删除即可

    问题2

    an id must be provided if version type or value are set;

    upsert 时必须指定 id 

    "es.mapping.id"->"id"

    问题3

    kafka 存储的是 json 序列化内容,spark 操作中需要反序列化,默认应用的json4s

    map(jsonitem=>{
    implicit val formats = DefaultFormats
    parseJson(jsonitem).extract[ESData]
    }

    ESData 为 case class 若json 字符串不规范,缺少相应字段,则会报错,为该字段设默认值即可

    case class ESData(bool_isEssence : Option[Boolean]=Some(false),text_title : String)

    写入 es 配置官方文档

    https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

    官方示例  

    es.resource.write = my-collection/{media_type}

     index 类型是固定的,经测,同样可以自定义

    如 

    es.resource.write ={media_type}/{media_type}

     

    elasticsearch 存储时根据年月分区

    控制信息都保存在源json数据内,spark 写入时只作反序列化,和index 和 type 映射

  • 相关阅读:
    网络协议 22
    网络协议 21
    网络协议 20
    网络协议 19
    网络协议 18
    网络协议 17
    网络协议 16
    网络协议 15
    网络协议 14
    .net 4.0 中的特性总结(五):并行编程
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/7661753.html
Copyright © 2011-2022 走看看