zoukankan      html  css  js  c++  java
  • 纪录:Solr6.4.2+Flume1.7.0 +morphline+kafka集成

         当前大多数企业版hadoop的solr版本都还停留在solr4.x,由于这个版本的solr本身的bug较多,使用起来会出很多奇怪的问题。如部分更新日期字段失败的问题

         最新的solr版本不仅修复了以前的一些常见bug,还提供了更简便易用的功能,如ManagedSchema替代schema.xml来管理索引的schema。

        由于solr自带的接口和入库工具需要一些定制开发,所以通常用flume来作为数据采集的工具。数据流图如下:

    image

    具体见前文:《json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引

    在Cloudera等企业版hadoop中,Solr和Flume已经集成,并能互通。如果你目前的情况是使用Cloudera企业版,请看上面这篇文章。

    然而由于集成的版本跟不上开源社区最新版本,还是很嫌弃的。于是就有了下面的配置最新版本的Solr和Flume互通:

    1.Solr最新版服务部署及入门:

    见solr官网quickstart。

    http://lucene.apache.org/solr/quickstart.html

    说明:创建Solr集合的部分,不是本章重点,所以这里没有介绍。

    另,本例中和前文不同,使用的不是SolrCloud模式,而是单机的Solr。

    2.Flume最新版部署及入门

    下载地址:http://flume.apache.org/download.html

    入门介绍:https://cwiki.apache.org//confluence/display/FLUME/Getting+Started

    详细配置介绍:http://flume.apache.org/FlumeUserGuide.html

    详细配置介绍中,需要关注的是KafkaSource和MorphlineSolrSink。

    最终的flume.conf配置为:

    kafka2solr.sources = source_from_kafka
    kafka2solr.channels = customer_doc_channel
    kafka2solr.sinks = solr_sink1
    
    # For each one of the sources, the type is defined  
    kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
    kafka2solr.sources.source_from_kafka.channels = customer_doc_channel
    kafka2solr.sources.source_from_kafka.batchSize = 100
    kafka2solr.sources.source_from_kafka.useFlumeEventFormat=false
    kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
    kafka2solr.sources.source_from_kafka.kafka.topics = tablecardLogin
    kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = catering_customer_core_070327
    kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=earliest
    
    # Other config values specific to each type of channel(sink or source)  
    # can be defined as well  
    kafka2solr.channels.customer_doc_channel.type = file
    kafka2solr.channels.customer_doc_channel.capacity=10000000
    kafka2solr.channels.customer_doc_channel.checkpointDir = /home/arli/data/flume-ng/customer_doc/checkpoint
    kafka2solr.channels.customer_doc_channel.dataDirs = /home/arli/data/flume-ng/customer_doc/data
    
    kafka2solr.sinks.solr_sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
    kafka2solr.sinks.solr_sink1.channel = customer_doc_channel
    kafka2solr.sinks.solr_sink1.batchSize = 5000
    kafka2solr.sinks.solr_sink1.batchDurationMillis = 2000
    kafka2solr.sinks.solr_sink1.morphlineFile = /home/arli/flume-config/morphlines.conf
    kafka2solr.sinks.solr_sink1.morphlineId=morphline1
    kafka2solr.sinks.solr_sink1.isIgnoringRecoverableExceptions=true
    #kafka2solr.sinks.solr_sink1.isProductionMode=true

    3.新建一个Flume配置目录,下面四个文件是比较重要的。

    image

    flume.conf 来自上一节的配置。

    flume-env.sh 来自安装目录conf下的flume-env.sh.template。需要改动。

    log4j.properties 在调试过程中可以开启更低级别的日志打印。

    morphline.conf 参考Morphline的文档:http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

    4.接下来详细介绍上面的后三个配置文件。

    1)flume-env.sh

    image

    需要改动的地方如上:

    #默认的内存是不够的。需要扩大内存。
    export JAVA_OPTS="-Xms100m -Xmx500m -Dcom.sun.management.jmxremote"
    
    #Flume官方下载的包少了一些Solr相关的包,需要把solr的lib目录加到flume的classpath下。
    FLUME_CLASSPATH="/xxx/solr-6.4.2/contrib/morphlines-core/lib/*:/xxxi/solr-6.4.2/dist/*:/xxx/solr-6.4.2/dist/solrj-lib/*:/xxx/solr-6.4.2/server/solr-webapp/webapp/WEB-INF/lib/*"

    2)log4j.properties

    100MB改成10MB,以防打日志太多日志文件过大。

    image

    在调试阶段,加上如下两行会省心很多,调试完再去掉。

    log4j.logger.org.apache.flume.sink.solr=DEBUG
    log4j.logger.org.kitesdk.morphline=TRACE

    3)morphline.conf

    大部分和前文:《json数据处理实战:Kafka+Flume+Morphline+Solr+Hue数据组合索引》雷同。由于我使用的是单机版本的Solr,所以在配置时如下。

    注意solrUrl和solrHomeDir的配置,在官网中没有介绍(因为morphline是cloudera开发并开源的,cloudera的solr默认是solrCloud),但是在源码阅读时可以看到这两个单机solr配置参数。

    SOLR_LOCATOR : {
       solrUrl : "http:\/\/localhost:8983\/solr\/catering_customer_core1"
       solrHomeDir : "/xxx/server/solr/catering_customer_core1/conf"
    }
    
    morphlines : [
      {
        #customer morphline
        id : morphline1
        
        # Import all morphline commands in these java packages and their subpackages.
        # Other commands that may be present on the classpath are not visible to this morphline.
        importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
        
        commands : [                 
        { 
            readJson {}
        } 
        
        {
            tryRules 
            {
                catchExceptions : false
                throwExceptionIfAllRulesFailed : true
                rules : [
                {
                  commands : [
                    { 
                        contains {topic : [tablecardLogin] } 
                    }
                    
                    #field need to be indexed from json.
                    {
                        extractJsonPaths {
                          flatten : false
                          paths : {
                            account:/account         
                            customer_id:/customerId
                            history_signin_dates:/opt_time
                            history_signin_timestamps:/opt_time
                            name:/name
                            sex:/sex
                            }
                        }
                    }
                  ]
                  
                }
                
    
                # if desired, the last rule can serve as a fallback mechanism 
                # for records that don't match any rule:
                {
                        commands : [
                        { logWarn { format : "Ignoring record with unsupported input format: {}", args : ["@{}"] } }
                        { dropRecord {} }    
                        ]
                }
              ]
            }
        }
    
        {
            convertTimestamp {
                field : history_signin_dates
                inputFormats : ["yyyy-MM-dd HH:mm:ss"]
                inputTimezone : Asia/Shanghai
                outputFormat : "yyyy-MM-dd'T'HH:mm:ss'Z/SECOND'"
                outputTimezone : Asia/Shanghai
                }
        }
    
        {
            convertTimestamp {
                field : history_signin_timestamps
                inputFormats : ["yyyy-MM-dd HH:mm:ss"]
                inputTimezone : Asia/Shanghai
                outputFormat : "unixTimeInMillis"
                outputTimezone : UTC
                }
        }
    
        {
            java {
                imports : "import java.util.*;import org.kitesdk.morphline.api.Command;import org.kitesdk.morphline.api.Record;"
                code:     """
                                Object customerId = record.getFirstValue("customer_id");
                                Object account = record.getFirstValue("account");
                                record.put("id", account + "@" + customerId);
                                return child.process(record);
                        """
                }
        }
        
        {sanitizeUnknownSolrFields {solrLocator : ${SOLR_LOCATOR}}}
    
        #将数据导入到solr中
        {loadSolr {solrLocator : ${SOLR_LOCATOR}}}
    
         ]
      }
    ]

    4.Morphline中的sanitizeUnknownSolrFields命令需要有schema.xml才能使用。

    Solr6.4.2的schema默认是用managed-schema文件管理的。如果上面配置中的solrHomeDir目录下没有shema.xml文件,则会报错。

    好在managed-schema和之前schema.xml文件内容几乎一致。执行如下命令即可。

    cp managed-schema schema.xml

    5.解决Flume1.7.0和solr6.4.2的jar包冲突问题。

    Flume1.7在编译时使用的是Solr4.10.1的包,而其中lib目录下,Solrj依赖的httpcore-4.1.3包已与最新的Solrj不兼容,因此在solr目录dist/solrj-lib下找到对应的包然后替换。

    image

    另外还需要清理的两种包:1.Flume的lib目录老的solr版本相关的包,2.若存在kite-morphline-solr-core(因为solr自己发布的版本已经包含了等价的solr-morphline-core包)则需要清理。(由于本文在写作时相应的包都已经清理了,所以记录的不够细节,望见谅。)

    6.启动flume。调试时可以先在控制台启动,去掉最后的&。

    bin/flume-ng agent --conf ~/flume-config/ -f ~/flume-config/flume.conf  -n kafka2solr &

    如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
    如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
    如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【Arli】。

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 相关阅读:
    从Go语言编码角度解释实现简易区块链——打造公链
    CSAPP:位操作实现基本运算
    虚拟机Ubuntu系统无法连接网络解决方案
    CSAPP:逆向工程【二进制炸弹】
    分析一套源代码的代码规范和风格并讨论如何改进优化代码
    【Recorder.js+百度语音识别】全栈方案技术细节
    webpack4.0各个击破(5)—— Module篇
    webpack4.0各个击破(4)—— Javascript & splitChunk
    webpack4.0各个击破(3)—— Assets篇
    javascript基础修炼(4)——UMD规范的代码推演
  • 原文地址:https://www.cnblogs.com/arli/p/6633228.html
Copyright © 2011-2022 走看看