zoukankan      html  css  js  c++  java
  • CDH离线数据导入solr:利用MapReduceIndexerTool将json文件批量导入到solr

    场景描述:前段时间,将实时数据通过kafka+flume+morphline的方式接入到solr中。新进来的数据已经可以在solr中看到了,但是以前的历史数据还没有导入solr。

    CDH提供利用MapReduceIndexerTool来将HDFS的数据导入到solr。

    历史数据格式类似如下按年/月/日保存在HDFS上每天一个文件:

    -/user/data/2016

    -11

      -1

        -data.txt

      -2

        -data.txt 

    -12

       -1

        -data.txt

       -2

        -data.txt

    文件的格式为一行一行的json。

    思路:

    先对2016目录下的所有子目录遍历文件,

    再对文件进行批量的索引操作。

    使用命令:(jar包在/opt/cloudera/parcels/CDH/jars下)

    hadoop jar search-mr-1.0.0-cdh5.8.0-job.jar org.apache.solr.hadoop.HdfsFindTool  -find hdfs://cdh-master/user/kafkadata/eventCount/2016/11  -type f | sudo -u xuyali hadoop --config /etc/hadoop/conf.solrindexer/  jar search-mr-1.0.0-cdh5.8.0-job.jar  org.apache.solr.hadoop.MapReduceIndexerTool --log4j log4j.properties  --morphline-file morphline.conf --zk-host cdh-master:2181/solr --collection event_count_records  --output-dir hdfs://cdh-master/user/hdfs/test/ --verbose --go-live  --input-list -

    参考:cdh官方文档——batch indexing solr

    *注意:官方文档中用的配置是mapreduce1,可以用yarn的客户端配置来代替该配置。

    morphline.conf

    SOLR_LOCATOR : {
      # Name of solr collection
      collection : event_count_records
      
      # ZooKeeper ensemble 
      #CDH的专有写法,开源版本不支持。
      zkHost : "$ZK_HOST"
      }
    
    morphlines : [
      {
        id : morphline1
        importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
        
        commands : [  
    {
    readLine {
      charset : UTF-8
    }
    }
    {setValues:{_attachment_body : "@{message}"}}
    
    {java:{
    imports:"import java.io.*;import org.kitesdk.morphline.base.Fields;"
    code:"""
    String message=(String)record.getFirstValue(Fields.ATTACHMENT_BODY);
            if(message.contains("'"))
            {
                return true;
            }
            InputStream   inputStream   =   new   ByteArrayInputStream(message.getBytes());
            record.removeAll(Fields.ATTACHMENT_BODY);
            record.put(Fields.ATTACHMENT_BODY, inputStream);
            return child.process(record);
    """
    }}
    
    {
      #Flume传过来的kafka的json数据是用二进制流的形式,需要先读取json
       readJson{}
    }
    
    {
     #读出来的json字段必须转换成filed才能被solr索引到
    extractJsonPaths {
     flatten:true
     paths:{
    account:/account
    accountName:/accountName
    subaccount:/subaccount
    subaccountName:/subaccountName
    eventTime:/timestamp
    eventType:/eventType
    eventTags:"/eventTags[]/name"
    #按UTC时间存timestamp
    eventTimeInMinuteUTC_tdt:/timestamp
    #按China时间存timestamp
    eventTimeInMinuteChina_tdt:/timestamp
    #按UTC时间存timestamp
    eventTimeInHourUTC_tdt:/timestamp
    #_tdt后缀会被动态识别为日期类型的索引字段
    #按不同时间间隔存索引以增加查询性能
    }
     
    }
    }
    
    #转换long型时间为Date格式
    {convertTimestamp {
      field : eventTimeInMinuteChina_tdt
      inputFormats : ["unixTimeInMillis"]
      inputTimezone : UTC
      outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'"
      outputTimezone : Asia/Shanghai
    }}
    
    {convertTimestamp {
      field : eventTimeInMinuteUTC_tdt
      inputFormats : ["unixTimeInMillis"]
      inputTimezone : UTC
      outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'"
      outputTimezone : UTC
    }}
    
    {convertTimestamp {
      field : eventTimeInHourUTC_tdt
      inputFormats : ["unixTimeInMillis"]
      inputTimezone : UTC
      outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'"
      outputTimezone : UTC
    }}
    
    #kafka中的json数据传到flume中时会被放入_attachment_body字段,readJson后会变成JsonNode对象,需要toString之后才能保存
    {toString { field : _attachment_body }}
    
    
    #为每一条记录生成一个UUID
    {generateUUID {
      field : id
    }}
    
    sanitizeUnknownSolrFields {
      solrLocator : ${SOLR_LOCATOR}
    }
    
    #对未定义的Solr字段加tws前缀,根据schema.xml中定义的tws_*为text_ws类型,会动态未未定义的字段建索引。  
    
    #将数据导入到solr中                
          {loadSolr {solrLocator : ${SOLR_LOCATOR}}}
        ]
      }
    ]

    log4j.properties:

    log4j.rootLogger=WARN, A1
    
    log4j.logger.org.apache.flume.sink=INFO
    #log4j.logger.org.apache.flume.sink.solr=DEBUG
    log4j.logger.org.apache.solr=INFO
    #log4j.logger.org.apache.solr.hadoop=DEBUG
    log4j.logger.org.kitesdk.morphline=TRACE
    #log4j.logger.org.apache.solr.morphline=DEBUG
    log4j.logger.org.apache.solr.update.processor.LogUpdateProcessor=WARN
    log4j.logger.org.apache.solr.core.SolrCore=WARN
    log4j.logger.org.apache.solr.search.SolrIndexSearcher=ERROR
    
    # A1 is set to be a ConsoleAppender.
    log4j.appender.A1=org.apache.log4j.ConsoleAppender
    
    # A1 uses PatternLayout.
    log4j.appender.A1.layout=org.apache.log4j.PatternLayout
    log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

    map数等于要被批量索引的文件数。

    yarn的客户端配置可以作下修改,设置reduce的个数,每个map占用的内存cpu等(map数不能修改)。

    任务完成提示:

    image

    *批量索引的效率并不一定总是比实时索引高,但优点是不吃solr服务性能——没有调用solr接口,而是直接生成索引文件后移至solr collection目录下。

    *调试morphline.conf bug时先用小点的单个文件,如果morphline写的有错,一个文件的任务失败会导致整个任务失败。

    如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
    如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
    如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【Arli】。

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 相关阅读:
    使用nodejs消费SAP Cloud for Customer上的Web service
    如何在SAP Cloud for Customer自定义BO中创建访问控制
    SAP云平台运行环境Cloud Foundry和Neo的区别
    SAP成都研究院马洪波:提升学习力,增强竞争力,收获一生乐趣
    SAP Netweaver的负载均衡消息服务器 vs CloudFoundry的App Router
    写在Github被微软收购之际
    在SAP云平台的CloudFoundry环境下消费ABAP On-Premise OData服务
    Java实现 LeetCode 517 超级洗衣机
    Java实现 LeetCode 517 超级洗衣机
    Java实现 LeetCode 517 超级洗衣机
  • 原文地址:https://www.cnblogs.com/arli/p/6248819.html
Copyright © 2011-2022 走看看