zoukankan      html  css  js  c++  java
  • 玩转大数据系列之Apache Pig如何与Apache Solr集成(二)

    散仙,在上篇文章中介绍了,如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程。 
    在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点: 

    (一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响 

    (二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的检索需求和服务,如果改动索引配置,则有可能需要重新编译源码。 

    (三)对Hadoop的分布式存储系统HDFS依赖过强,如果使用与Lucene集成,那么则意味着你提供检索的Web服务器,则必须跟hadoop的存储节点在一个机器上,否则,无法从HDFS上下拉索引,除非你自己写程序,或使用scp再次从目标机传输,这样无疑又增加了,系统的复杂性。 


    鉴于有以上几个缺点,所以建议大家使用Solr或ElasticSearch这样的封装了Lucene更高级的API框架,那么Solr与ElasticSearch和Lucene相比,又有什么优点呢? 

    (1)在最终的写入数据时,我们可以直接最终结果写入solr或es,同时也可以在HDFS上保存一份,作为灾备。 

    (2)使用了solr或es,这时,我们字段的配置完全与UDF函数代码无关,我们的任何字段配置的变动,都不会影响Pig的UDF函数的代码,而在UDF函数里,唯一要做的,就是将最终数据,提供给solr和es服务。 

    (3)solr和es都提供了restful风格的http操作方式,这时候,我们的检索集群完全可以与Hadoop集群分离,从而让他们各自都专注自己的服务。 



    下面,散仙就具体说下如何使用Pig和Solr集成? 

    (1)依旧访问这个地址下载源码压缩包。 
    (2)提取出自己想要的部分,在eclipse工程中,修改定制适合自己环境的的代码(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。 
    (3)使用ant重新打包成jar 
    (4)在pig里,注册相关依赖的jar包,并使用索引存储 



    注意,在github下载的压缩里直接提供了对SolrCloud模式的提供,而没有提供,普通模式的函数,散仙在这里稍作修改后,可以支持普通模式的Solr服务,代码如下:


    SolrOutputFormat函数 

    Java代码  收藏代码
    1. package com.pig.support.solr;  
    2.   
    3.   
    4.   
    5. import java.io.IOException;  
    6. import java.util.ArrayList;  
    7. import java.util.List;  
    8. import java.util.concurrent.Executors;  
    9. import java.util.concurrent.ScheduledExecutorService;  
    10. import java.util.concurrent.TimeUnit;  
    11.   
    12. import org.apache.hadoop.io.Writable;  
    13. import org.apache.hadoop.mapreduce.JobContext;  
    14. import org.apache.hadoop.mapreduce.OutputCommitter;  
    15. import org.apache.hadoop.mapreduce.RecordWriter;  
    16. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
    17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    18. import org.apache.solr.client.solrj.SolrServer;  
    19. import org.apache.solr.client.solrj.SolrServerException;  
    20. import org.apache.solr.client.solrj.impl.CloudSolrServer;  
    21. import org.apache.solr.client.solrj.impl.HttpSolrServer;  
    22. import org.apache.solr.common.SolrInputDocument;  
    23. /** 
    24.  * @author qindongliang 
    25.  * 支持SOlr的SolrOutputFormat 
    26.  * 如果你想了解,或学习更多这方面的 
    27.  * 知识,请加入我们的群: 
    28.  *  
    29.  * 搜索技术交流群(2000人):324714439  
    30.  * 大数据技术1号交流群(2000人):376932160  (已满) 
    31.  * 大数据技术2号交流群(2000人):415886155  
    32.  * 微信公众号:我是攻城师(woshigcs) 
    33.  *  
    34.  * */  
    35. public class SolrOutputFormat extends  
    36.         FileOutputFormat<Writable, SolrInputDocument> {  
    37.   
    38.     final String address;  
    39.     final String collection;  
    40.   
    41.     public SolrOutputFormat(String address, String collection) {  
    42.         this.address = address;  
    43.         this.collection = collection;  
    44.     }  
    45.   
    46.     @Override  
    47.     public RecordWriter<Writable, SolrInputDocument> getRecordWriter(  
    48.             TaskAttemptContext ctx) throws IOException, InterruptedException {  
    49.         return new SolrRecordWriter(ctx, address, collection);  
    50.     }  
    51.   
    52.       
    53.     @Override  
    54.     public synchronized OutputCommitter getOutputCommitter(  
    55.             TaskAttemptContext arg0) throws IOException {  
    56.         return new OutputCommitter(){  
    57.   
    58.             @Override  
    59.             public void abortTask(TaskAttemptContext ctx) throws IOException {  
    60.                   
    61.             }  
    62.   
    63.             @Override  
    64.             public void commitTask(TaskAttemptContext ctx) throws IOException {  
    65.                   
    66.             }  
    67.   
    68.             @Override  
    69.             public boolean needsTaskCommit(TaskAttemptContext arg0)  
    70.                     throws IOException {  
    71.                 return true;  
    72.             }  
    73.   
    74.             @Override  
    75.             public void setupJob(JobContext ctx) throws IOException {  
    76.                   
    77.             }  
    78.   
    79.             @Override  
    80.             public void setupTask(TaskAttemptContext ctx) throws IOException {  
    81.                   
    82.             }  
    83.               
    84.               
    85.         };  
    86.     }  
    87.   
    88.   
    89.     /** 
    90.      * Write out the LuceneIndex to a local temporary location.<br/> 
    91.      * On commit/close the index is copied to the hdfs output directory.<br/> 
    92.      *  
    93.      */  
    94.     static class SolrRecordWriter extends RecordWriter<Writable, SolrInputDocument> {  
    95.         /**Solr的地址*/  
    96.         SolrServer server;  
    97.         /**批处理提交的数量**/  
    98.         int batch = 5000;  
    99.           
    100.         TaskAttemptContext ctx;  
    101.           
    102.         List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(batch);  
    103.         ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();  
    104.         /** 
    105.          * Opens and forces connect to CloudSolrServer 
    106.          *  
    107.          * @param address 
    108.          */  
    109.         public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {  
    110.             try {  
    111.                 this.ctx = ctx;  
    112.                 server = new HttpSolrServer(address);  
    113.                   
    114.                 exec.scheduleWithFixedDelay(new Runnable(){  
    115.                     public void run(){  
    116.                         ctx.progress();  
    117.                     }  
    118.                 }, 1000, 1000, TimeUnit.MILLISECONDS);  
    119.             } catch (Exception e) {  
    120.                 RuntimeException exc = new RuntimeException(e.toString(), e);  
    121.                 exc.setStackTrace(e.getStackTrace());  
    122.                 throw exc;  
    123.             }  
    124.         }  
    125.   
    126.           
    127.         /** 
    128.          * On close we commit 
    129.          */  
    130.         @Override  
    131.         public void close(final TaskAttemptContext ctx) throws IOException,  
    132.                 InterruptedException {  
    133.   
    134.             try {  
    135.                   
    136.                 if (docs.size() > 0) {  
    137.                     server.add(docs);  
    138.                     docs.clear();  
    139.                 }  
    140.   
    141.                 server.commit();  
    142.             } catch (SolrServerException e) {  
    143.                 RuntimeException exc = new RuntimeException(e.toString(), e);  
    144.                 exc.setStackTrace(e.getStackTrace());  
    145.                 throw exc;  
    146.             } finally {  
    147.                 server.shutdown();  
    148.                 exec.shutdownNow();  
    149.             }  
    150.               
    151.         }  
    152.   
    153.         /** 
    154.          * We add the indexed documents without commit 
    155.          */  
    156.         @Override  
    157.         public void write(Writable key, SolrInputDocument doc)  
    158.                 throws IOException, InterruptedException {  
    159.             try {  
    160.                 docs.add(doc);  
    161.                 if (docs.size() >= batch) {  
    162.                     server.add(docs);  
    163.                     docs.clear();  
    164.                 }  
    165.             } catch (SolrServerException e) {  
    166.                 RuntimeException exc = new RuntimeException(e.toString(), e);  
    167.                 exc.setStackTrace(e.getStackTrace());  
    168.                 throw exc;  
    169.             }  
    170.         }  
    171.   
    172.     }  
    173. }  





    SolrStore函数 

    Java代码  收藏代码
    1. package com.pig.support.solr;  
    2.   
    3.   
    4.   
    5. import java.io.IOException;  
    6. import java.util.Properties;  
    7.   
    8. import org.apache.hadoop.fs.Path;  
    9. import org.apache.hadoop.io.Writable;  
    10. import org.apache.hadoop.mapreduce.Job;  
    11. import org.apache.hadoop.mapreduce.OutputFormat;  
    12. import org.apache.hadoop.mapreduce.RecordWriter;  
    13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    14. import org.apache.pig.ResourceSchema;  
    15. import org.apache.pig.ResourceSchema.ResourceFieldSchema;  
    16. import org.apache.pig.ResourceStatistics;  
    17. import org.apache.pig.StoreFunc;  
    18. import org.apache.pig.StoreMetadata;  
    19. import org.apache.pig.data.Tuple;  
    20. import org.apache.pig.impl.util.UDFContext;  
    21. import org.apache.pig.impl.util.Utils;  
    22. import org.apache.solr.common.SolrInputDocument;  
    23.   
    24. /** 
    25.  *  
    26.  * Create a lucene index 
    27.  *  
    28.  */  
    29. public class SolrStore extends StoreFunc implements StoreMetadata {  
    30.   
    31.     private static final String SCHEMA_SIGNATURE = "solr.output.schema";  
    32.   
    33.     ResourceSchema schema;  
    34.     String udfSignature;  
    35.     RecordWriter<Writable, SolrInputDocument> writer;  
    36.   
    37.     String address;  
    38.     String collection;  
    39.       
    40.     public SolrStore(String address, String collection) {  
    41.         this.address = address;  
    42.         this.collection = collection;  
    43.     }  
    44.   
    45.     public void storeStatistics(ResourceStatistics stats, String location,  
    46.             Job job) throws IOException {  
    47.     }  
    48.   
    49.     public void storeSchema(ResourceSchema schema, String location, Job job)  
    50.             throws IOException {  
    51.     }  
    52.   
    53.     @Override  
    54.     public void checkSchema(ResourceSchema s) throws IOException {  
    55.         UDFContext udfc = UDFContext.getUDFContext();  
    56.         Properties p = udfc.getUDFProperties(this.getClass(),  
    57.                 new String[] { udfSignature });  
    58.         p.setProperty(SCHEMA_SIGNATURE, s.toString());  
    59.     }  
    60.   
    61.     public OutputFormat<Writable, SolrInputDocument> getOutputFormat()  
    62.             throws IOException {  
    63.         // not be used  
    64.         return new SolrOutputFormat(address, collection);  
    65.     }  
    66.   
    67.     /** 
    68.      * Not used 
    69.      */  
    70.     @Override  
    71.     public void setStoreLocation(String location, Job job) throws IOException {  
    72.         FileOutputFormat.setOutputPath(job, new Path(location));  
    73.     }  
    74.   
    75.     @Override  
    76.     public void setStoreFuncUDFContextSignature(String signature) {  
    77.         this.udfSignature = signature;  
    78.     }  
    79.   
    80.     @SuppressWarnings({ "unchecked", "rawtypes" })  
    81.     @Override  
    82.     public void prepareToWrite(RecordWriter writer) throws IOException {  
    83.         this.writer = writer;  
    84.         UDFContext udc = UDFContext.getUDFContext();  
    85.         String schemaStr = udc.getUDFProperties(this.getClass(),  
    86.                 new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);  
    87.   
    88.         if (schemaStr == null) {  
    89.             throw new RuntimeException("Could not find udf signature");  
    90.         }  
    91.   
    92.         schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));  
    93.   
    94.     }  
    95.   
    96.     /** 
    97.      * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch 
    98.      * @param input 
    99.      * @return 
    100.      */  
    101.     private static String stripNonCharCodepoints(String input) {  
    102.         StringBuilder retval = new StringBuilder(input.length());  
    103.         char ch;  
    104.   
    105.         for (int i = 0; i < input.length(); i++) {  
    106.             ch = input.charAt(i);  
    107.   
    108.             // Strip all non-characters  
    109.             // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]  
    110.             // and non-printable control characters except tabulator, new line  
    111.             // and carriage return  
    112.             if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step  
    113.                                             // 0x10000  
    114.                     ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range  
    115.                     (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef  
    116.                     (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {  
    117.   
    118.                 retval.append(ch);  
    119.             }  
    120.         }  
    121.   
    122.         return retval.toString();  
    123.     }  
    124.   
    125.     @Override  
    126.     public void putNext(Tuple t) throws IOException {  
    127.   
    128.         final SolrInputDocument doc = new SolrInputDocument();  
    129.   
    130.         final ResourceFieldSchema[] fields = schema.getFields();  
    131.         int docfields = 0;  
    132.   
    133.         for (int i = 0; i < fields.length; i++) {  
    134.             final Object value = t.get(i);  
    135.   
    136.             if (value != null) {  
    137.                 docfields++;  
    138.                 doc.addField(fields[i].getName().trim(), stripNonCharCodepoints(value.toString()));  
    139.             }  
    140.   
    141.         }  
    142.   
    143.         try {  
    144.             if (docfields > 0)  
    145.                 writer.write(null, doc);  
    146.         } catch (InterruptedException e) {  
    147.             Thread.currentThread().interrupt();  
    148.             return;  
    149.         }  
    150.   
    151.     }  
    152.   
    153. }  



    Pig脚本如下: 

    Java代码  收藏代码
    1. --注册依赖文件的jar包  
    2. REGISTER ./dependfiles/tools.jar;  
    3.   
    4. --注册solr相关的jar包  
    5. REGISTER  ./solrdependfiles/pigudf.jar;   
    6. REGISTER  ./solrdependfiles/solr-core-4.10.2.jar;  
    7. REGISTER  ./solrdependfiles/solr-solrj-4.10.2.jar;  
    8. REGISTER  ./solrdependfiles/httpclient-4.3.1.jar  
    9. REGISTER  ./solrdependfiles/httpcore-4.3.jar  
    10. REGISTER  ./solrdependfiles/httpmime-4.3.1.jar  
    11. REGISTER  ./solrdependfiles/noggit-0.5.jar  
    12.   
    13.   
    14. --加载HDFS数据,并定义scheaml  
    15. a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);  
    16.   
    17. --存储到solr中,并提供solr的ip地址和端口号  
    18. store d into '/user/search/solrindextemp'  using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');  
    19. ~                                                                                                                                                              
    20. ~                                                                        
    21. ~                                 



    配置成功之后,我们就可以运行程序,加载HDFS上数据,经过计算处理之后,并将最终的结果,存储到Solr之中,截图如下: 






    成功之后,我们就可以很方便的在solr中进行毫秒级别的操作了,例如各种各样的全文查询,过滤,排序统计等等! 

    同样的方式,我们也可以将索引存储在ElasticSearch中,关于如何使用Pig和ElasticSearch集成,散仙也会在后面的文章中介绍,敬请期待! 

  • 相关阅读:
    Cesium原理篇:4Web Workers剖析(2)
    Cesium原理篇:4Web Workers剖析
    Cesium原理篇:3最长的一帧之地形(1)
    Cesium原理篇:2最长的一帧之网格划分
    Cesium原理篇:1最长的一帧之渲染调度
    CSS3火焰文字特效制作教程
    一款非常棒的纯CSS3 3D菜单演示及制作教程
    jQuery/CSS3类似阿里巴巴的商品导航菜单实现教程
    CSS3 3D立方体翻转菜单实现教程
    强大!HTML5 3D美女图片旋转实现教程
  • 原文地址:https://www.cnblogs.com/qindongliang/p/4319350.html
Copyright © 2011-2022 走看看