zoukankan      html  css  js  c++  java
  • 如何给Apache Pig自定义UDF函数?

    近日由于工作所需,需要使用到Pig来分析线上的搜索日志数据,散仙本打算使用hive来分析的,但由于种种原因,没有用成,而Pig(pig0.12-cdh)散仙一直没有接触过,所以只能临阵磨枪了,花了两天时间,大致看完了pig官网的文档,在看文档期间,也是边实战边学习,这样以来,对pig的学习,会更加容易,当然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,散仙打算介绍下如何在Pig中,使用用户自定义的UDF函数,关于学习经验,散仙会在后面的文章里介绍。 



    一旦你学会了UDF的使用,就意味着,你可以以更加灵活的方式来使用Pig,使它扩展一些为我们的业务场景定制的特殊功能,而这些功能,在通用的pig里是没有的,举个例子: 

    你从HDFS上读取的数据格式,如果使用默认的PigStorage()来加载,存储可能只支持有限的数据编码和类型,如果我们定义了一种特殊的编码存储或序列化方式,那么当我们使用默认的Pig来加载的时候,就会发现加载不了,这时候我们的UDF就派上用场了,我们只需要自定义一个LoadFunction和一个StoreFunction就可以解决,这种问题。 


    本篇散仙根据官方文档的例子,来实战一下,并在hadoop集群上使用Pig测试通过: 
    我们先来看下定义一个UDF扩展类,需要几个步骤: 

    序号 步骤 说明
    1 在eclipse里新建一个java工程,并导入pig的核心包 java项目
    2 新建一个包,继承特定的接口或类,重写自定义部分 核心业务
    3 编写完成后,使用ant打包成jar 编译时需要pig依赖,但不用把pig的jar包打入UDF中
    4 把打包完成后的jar上传到HDFS上 pig运行时候需要加载使用
    5 在pig脚本里,注册我们自定义的udf的jar包 注入运行时环境
    6 编写我们的核心业务pig脚本运行 测试是否运行成功


    项目工程截图如下: 

     



    核心代码如下: 

    Java代码  收藏代码
    1. package com.pigudf;  
    2.   
    3. import java.io.IOException;  
    4.   
    5. import org.apache.pig.EvalFunc;  
    6. import org.apache.pig.data.Tuple;  
    7. import org.apache.pig.impl.util.WrappedIOException;  
    8. /** 
    9.  * 自定义UDF类,对字符串转换大写 
    10.  * @author qindongliang 
    11.  * */  
    12. public class MyUDF extends EvalFunc<String> {  
    13.   
    14.     @Override  
    15.     public String exec(Tuple input) throws IOException {  
    16.           
    17.          //判断是否为null或空,就跳过  
    18.         if(input==null||input.size()==0){  
    19.             return null;  
    20.         }  
    21.         try{  
    22.             //获取第一个元素  
    23.             String str=(String) input.get(0);  
    24.             //转成大写返回  
    25.             return str.toUpperCase();  
    26.               
    27.         }catch(Exception e){  
    28.             throw WrappedIOException.wrap("Caught exception processing input row ",e);  
    29.         }  
    30.     }  
    31.       
    32.   
    33. }  



    关于打包的ant脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件一定要上传到HDFS上,除非你是local模式): 

    Java代码  收藏代码
    1. grunt> cat s.txt  
    2. zhang san,12  
    3. Song,34  
    4. long,34  
    5. abC,12  
    6. grunt>   




    我们在看下,操作文件和jar包是放在一起的: 

    Java代码  收藏代码
    1. grunt> ls  
    2. hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3>        1295  
    3. hdfs://dnode1:8020/tmp/udf/s.txt<r 3>   36  
    4. grunt>   



    最后,我们看下pig脚本的定义: 

    Pig代码  收藏代码
    1. --注册自定义的jar包  
    2. REGISTER pudf.jar;   
    3. --加载测试文件的数据,逗号作为分隔符  
    4. a = load 's.txt' using PigStorage(',');     
    5. --遍历数据,对name列转成大写  
    6. b =  foreach a generate com.pigudf.MyUDF((chararray)$0);   
    7. --启动MapReduce的Job进行数据分析  
    8. dump b  


    最后,我们看下结果,只要过程不出现异常和任务失败,就证明我们的udf使用成功: 

    Java代码  收藏代码
    1. Counters:  
    2. Total records written : 4  
    3. Total bytes written : 64  
    4. Spillable Memory Manager spill count : 0  
    5. Total bags proactively spilled: 0  
    6. Total records proactively spilled: 0  
    7.   
    8. Job DAG:  
    9. job_1419419533357_0147  
    10.   
    11.   
    12. 2014-12-30 18:10:24,394 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!  
    13. 2014-12-30 18:10:24,395 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS  
    14. 2014-12-30 18:10:24,396 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.  
    15. 2014-12-30 18:10:24,405 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1  
    16. 2014-12-30 18:10:24,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1  
    17. (ZHANG SAN,12)  
    18. (SONG,34)  
    19. (LONG,34)  
    20. (ABC,12)  


    结果没问题,我们的UDF加载执行成功,如果我们还想将我们的输出结果直接写入到HDFS上,可以在pig脚本的末尾,去掉dump命令,加入 
    store e into '/tmp/dongliang/result/'; 将结果存储到HDFS上,当然我们可以自定义存储函数,将结果写入数据库,Lucene,Hbase等关系型或一些NOSQL数据库里。

  • 相关阅读:
    Interview with BOA
    Java Main Differences between HashMap HashTable and ConcurrentHashMap
    Java Main Differences between Java and C++
    LeetCode 33. Search in Rotated Sorted Array
    LeetCode 154. Find Minimum in Rotated Sorted Array II
    LeetCode 153. Find Minimum in Rotated Sorted Array
    LeetCode 75. Sort Colors
    LeetCode 31. Next Permutation
    LeetCode 60. Permutation Sequence
    LeetCode 216. Combination Sum III
  • 原文地址:https://www.cnblogs.com/qindongliang/p/4195531.html
Copyright © 2011-2022 走看看