场景描述:前段时间,将实时数据通过kafka+flume+morphline的方式接入到solr中。新进来的数据已经可以在solr中看到了,但是以前的历史数据还没有导入solr。
CDH提供利用MapReduceIndexerTool来将HDFS的数据导入到solr。
历史数据格式类似如下按年/月/日保存在HDFS上每天一个文件:
-/user/data/2016
-11
-1
-data.txt
-2
-data.txt
-12
-1
-data.txt
-2
-data.txt
文件的格式为一行一行的json。
思路:
先对2016目录下的所有子目录遍历文件,
再对文件进行批量的索引操作。
使用命令:(jar包在/opt/cloudera/parcels/CDH/jars下)
hadoop jar search-mr-1.0.0-cdh5.8.0-job.jar org.apache.solr.hadoop.HdfsFindTool -find hdfs://cdh-master/user/kafkadata/eventCount/2016/11 -type f | sudo -u xuyali hadoop --config /etc/hadoop/conf.solrindexer/ jar search-mr-1.0.0-cdh5.8.0-job.jar org.apache.solr.hadoop.MapReduceIndexerTool --log4j log4j.properties --morphline-file morphline.conf --zk-host cdh-master:2181/solr --collection event_count_records --output-dir hdfs://cdh-master/user/hdfs/test/ --verbose --go-live --input-list -
参考:cdh官方文档——batch indexing solr
*注意:官方文档中用的配置是mapreduce1,可以用yarn的客户端配置来代替该配置。
morphline.conf
SOLR_LOCATOR : { # Name of solr collection collection : event_count_records # ZooKeeper ensemble #CDH的专有写法,开源版本不支持。 zkHost : "$ZK_HOST" } morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { readLine { charset : UTF-8 } } {setValues:{_attachment_body : "@{message}"}} {java:{ imports:"import java.io.*;import org.kitesdk.morphline.base.Fields;" code:""" String message=(String)record.getFirstValue(Fields.ATTACHMENT_BODY); if(message.contains("'")) { return true; } InputStream inputStream = new ByteArrayInputStream(message.getBytes()); record.removeAll(Fields.ATTACHMENT_BODY); record.put(Fields.ATTACHMENT_BODY, inputStream); return child.process(record); """ }} { #Flume传过来的kafka的json数据是用二进制流的形式,需要先读取json readJson{} } { #读出来的json字段必须转换成filed才能被solr索引到 extractJsonPaths { flatten:true paths:{ account:/account accountName:/accountName subaccount:/subaccount subaccountName:/subaccountName eventTime:/timestamp eventType:/eventType eventTags:"/eventTags[]/name" #按UTC时间存timestamp eventTimeInMinuteUTC_tdt:/timestamp #按China时间存timestamp eventTimeInMinuteChina_tdt:/timestamp #按UTC时间存timestamp eventTimeInHourUTC_tdt:/timestamp #_tdt后缀会被动态识别为日期类型的索引字段 #按不同时间间隔存索引以增加查询性能 } } } #转换long型时间为Date格式 {convertTimestamp { field : eventTimeInMinuteChina_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'" outputTimezone : Asia/Shanghai }} {convertTimestamp { field : eventTimeInMinuteUTC_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'" outputTimezone : UTC }} {convertTimestamp { field : eventTimeInHourUTC_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'" outputTimezone : UTC }} #kafka中的json数据传到flume中时会被放入_attachment_body字段,readJson后会变成JsonNode对象,需要toString之后才能保存 {toString { field : _attachment_body }} #为每一条记录生成一个UUID {generateUUID { field : id }} sanitizeUnknownSolrFields { solrLocator : ${SOLR_LOCATOR} } #对未定义的Solr字段加tws前缀,根据schema.xml中定义的tws_*为text_ws类型,会动态未未定义的字段建索引。 #将数据导入到solr中 {loadSolr {solrLocator : ${SOLR_LOCATOR}}} ] } ]
log4j.properties:
log4j.rootLogger=WARN, A1 log4j.logger.org.apache.flume.sink=INFO #log4j.logger.org.apache.flume.sink.solr=DEBUG log4j.logger.org.apache.solr=INFO #log4j.logger.org.apache.solr.hadoop=DEBUG log4j.logger.org.kitesdk.morphline=TRACE #log4j.logger.org.apache.solr.morphline=DEBUG log4j.logger.org.apache.solr.update.processor.LogUpdateProcessor=WARN log4j.logger.org.apache.solr.core.SolrCore=WARN log4j.logger.org.apache.solr.search.SolrIndexSearcher=ERROR # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
map数等于要被批量索引的文件数。
yarn的客户端配置可以作下修改,设置reduce的个数,每个map占用的内存cpu等(map数不能修改)。
任务完成提示:
*批量索引的效率并不一定总是比实时索引高,但优点是不吃solr服务性能——没有调用solr接口,而是直接生成索引文件后移至solr collection目录下。
*调试morphline.conf bug时先用小点的单个文件,如果morphline写的有错,一个文件的任务失败会导致整个任务失败。