zoukankan      html  css  js  c++  java
  • Pig用户自定义函数(UDF)转

    原文地址:http://blog.csdn.net/zythy/article/details/18326693

    我们以气温统计和词频统计为例,讲解以下三种用户自定义函数。

    用户自定义函数

    什么时候需要用户自定义函数呢?和其它语言一样,当你希望简化程序结构或者需要重用程序代码时,函数就是你不二选择。

    Pig的用户自定义函数可以用Java编写,但是也可以用Python或Javascript编写。我们接下来以Java为例。

    自定义过滤函数

    我们仍然以先前的代码为例:

    records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);

    valid_records = filter records by temperature!=999;

    第二个语句的作用就是筛选合法的数据。如果我们采用用户自定义函数,则第二个语句可以写成:

    valid_records = filter records by isValid(temperature);

    这种写法更容易理解,也更容易在多个地方重用。接下来的问题就是如何定义这个isValid函数。代码如下:

    packagecom.oserp.pigudf;

    importjava.io.IOException;

    importorg.apache.pig.FilterFunc;

    importorg.apache.pig.data.Tuple;

     

    public class IsValidTemperature extends FilterFunc {

             @Override

             public Boolean exec(Tuple tuple) throws IOException {            

                       Object object = tuple.get(0);

                       int temperature = (Integer)object;            

                       return temperature != 999;

             }

    }

    接下来,我们需要:

    1)  编译代码并打包成jar文件,比如pigudf.jar。

    2)  通过register命令将这个jar文件注册到pig环境:

    register/home/user/hadoop_jar/pigudf.jar //参数为jar文件的本地路径

    此时,我们就可以用以下语句调用这个函数:

    valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

    dump valid_records;

    看起来这个函数名太长,不便输入。我们可以用定义别名的方式代替:

    define isValid com.oserp.pigudf.IsValidTemperature();

    valid_records = filter records by isValid(temperature);

    dump valid_records;

    回到代码,我们可发现:

    1)  需要定义一个继承自FilterFunc的类。

    2)  重写这个类的exec方法。这个方法的参数只有一个tuple,但是调用时可以传递多个参数,你可以通过索引号获得对应的参数值,比如tuple.get(1)表示取第二个参数。

    3)  调用时,需要使用类的全名。(当然你可以自定义别名)

    4)  更多的验证需要读者自行在函数中添加,比如判断是否为null等等。

    备注:用Eclipse编写Pig自定义函数时,你可能需要引用到一些Hadoop的库文件。比较容易的方式是在新建项目时指定项目类型为MapReduce项目,这样Eclipse就会自动设置库引用的相关信息。

    自定义运算函数(Eval function)

    仍然以前面的数据文件为例:

    1990 21

    1990 18

    1991 21

    1992 30

    1992 999

    1990 23

    假设我们希望通过温度值获得一个温度的分类信息,比如我们把温度大于划分为以下类型:

    温度                            分类

    x>=30                          hot

    x>=10 and x<30        moderate

    x<10                                      cool

    则我们可以定义以下函数,代码如下:

    packagecom.oserp.pigudf;

    importjava.io.IOException;

    importorg.apache.pig.EvalFunc;

    importorg.apache.pig.data.Tuple;

     

    public class GetClassification extends EvalFunc<String> {

             @Override

             public String exec(Tuple tuple) throws IOException {               

                       Object object = tuple.get(0);

                       int temperature = (Integer)object;

                      

                       if (temperature >= 30){

                                return "Hot";

                       }

                       else if(temperature >=10){

                                return "Moderate";

                       }

                       else {

                                return "Cool";

                       }                

             }

    }

    依次输入以下Pig语句:

    records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);

    register /home/user/hadoop_jar/pigudf.jar;

    valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

    result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);

    dump result;

    输出结果如下:

    (1990,Moderate)

    (1990,Moderate)

    (1991,Moderate)

    (1992,Hot)    

    (1990,Moderate)

    代码比较简单,该类继承自EvalFunc类,且我们要明确定义返回值类型。

    有些时候其它类库可能包含有功能相近的Java函数,我们是否可以直接将这些库函数拿过来使用呢?可以。以下语句调用了trim函数,用于去掉name字段前后的空格:

    DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');

    B = FOREACH A GENERATE trim(name);

    其中的InvokeForString是一个Invoker(不知道该如何翻译啊),其通过反射机制调用,返回值是String类型。其它类似的还有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。

    自定义加载函数

    我们以词频统计为例,讲解如何自定义加载函数。(统计各个单词出现的频率,由高到低排序)

    一般情况下,load语句加载数据时,一行会被生成一个tuple。而统计词频时,我们希望每个单词生成一个tuple。我们的测试数据文件只有两行数据,如下:

    Thisis a map a reduce program

    mapreduce partition combiner

    我们希望load后能得到如下形式的数据,每个单词一个tuple:

    (This)

    (is)

    (a)

    (map)

    (a)

    (reduce)

     

    先看代码:

    package com.oserp.pigudf;

    import java.io.IOException;

    import java.util.ArrayList;

    import java.util.List;

    import org.apache.hadoop.io.Text;

    importorg.apache.hadoop.mapreduce.InputFormat;

    import org.apache.hadoop.mapreduce.Job;

    importorg.apache.hadoop.mapreduce.RecordReader;

    importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

    import org.apache.pig.LoadFunc;

    importorg.apache.pig.backend.executionengine.ExecException;

    importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

    import org.apache.pig.data.BagFactory;

    import org.apache.pig.data.DataBag;

    import org.apache.pig.data.Tuple;

    import org.apache.pig.data.TupleFactory;

     

     

    public class WordCountLoadFunc extends LoadFunc {

            

             private RecordReader reader;

             TupleFactorytupleFactory = TupleFactory.getInstance();

             BagFactorybagFactory = BagFactory.getInstance();

            

             @Override        

             public InputFormatgetInputFormat() throws IOException {

                       return new TextInputFormat();

             }      

     

             @Override

             public Tuple getNext() throws IOException {

                        

                       try {

                                // 当读取到分区数据块的末尾时,返回null表示数据已读取完

                                if (!reader.nextKeyValue()){

                                         return null;

                                         }

                                Textvalue = (Text)reader.getCurrentValue();

                                Stringline = value.toString();

                                String[]words =  line.split("\s+"); // 断词

                               

                                // 因为getNext函数只能返回一个tuple,

                                // 而我们希望每个单词一个单独的tuple,

                                // 所以我们将多个tuple放到一个bag里面,

                                // 然后返回一个包含一个bag的tuple。

                                // 注:这只是一个用于演示用法的示例,实际中这样使用不一定合理。

                                List<Tuple>tuples = new ArrayList<Tuple>();                    

                                Tupletuple = null;

                                for (String word : words) {

                                         tuple= tupleFactory.newTuple();

                                         tuple.append(word);

                                         tuples.add(tuple);

                                         }

                               

                                DataBagbag = bagFactory.newDefaultBag(tuples);

                                Tupleresult = tupleFactory.newTuple(bag);

                               

                                return result;

                       }

                       catch (InterruptedException e) {

                                throw new ExecException(e);

                       }

                      

             }

     

             @Override

             public void prepareToRead(RecordReader reader,PigSplit arg1)

                                throws IOException {

                       this.reader = reader;

             }

     

             @Override

             public void setLocation(String location, Job job) throws IOException {

                       FileInputFormat.setInputPaths(job,location);          

             }

     

    }

    依次执行以下命令:

    1)       records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});

    2)       flatten_records= foreach records generate flatten($0);

    3)       grouped_records= group flatten_records by words::w;

    4)       result= foreach grouped_records generate group,COUNT(flatten_records);

    5)       final_result= order result by $1 desc,$0;

    6)       dumpfinal_result;

    显示结果如下:

    (a,2)                  

    (map,2)            

    (reduce,2)        

    (This,1)             

    (combiner,1)   

    (is,1)                           

    (partition,1)    

    (program,1)     

    注意schema的定义格式:(words:bag{word:(w:chararray)})

  • 相关阅读:
    [ERR] Node 10.211.55.8:7001 is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0.
    PAT A1137 Final Grading (25 分)——排序
    PAT A1136 A Delayed Palindrome (20 分)——回文,大整数
    PAT A1134 Vertex Cover (25 分)——图遍历
    PAT A1133 Splitting A Linked List (25 分)——链表
    PAT A1132 Cut Integer (20 分)——数学题
    PAT A1130 Infix Expression (25 分)——中序遍历
    PAT A1142 Maximal Clique (25 分)——图
    PAT A1141 PAT Ranking of Institutions (25 分)——排序,结构体初始化
    PAT A1140 Look-and-say Sequence (20 分)——数学题
  • 原文地址:https://www.cnblogs.com/hadoop-dev/p/5913864.html
Copyright © 2011-2022 走看看