zoukankan      html  css  js  c++  java
  • Pig用户自定义函数(UDF)转

    原文地址:http://blog.csdn.net/zythy/article/details/18326693

    我们以气温统计和词频统计为例,讲解以下三种用户自定义函数。

    用户自定义函数

    什么时候需要用户自定义函数呢?和其它语言一样,当你希望简化程序结构或者需要重用程序代码时,函数就是你不二选择。

    Pig的用户自定义函数可以用Java编写,但是也可以用Python或Javascript编写。我们接下来以Java为例。

    自定义过滤函数

    我们仍然以先前的代码为例:

    records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);

    valid_records = filter records by temperature!=999;

    第二个语句的作用就是筛选合法的数据。如果我们采用用户自定义函数,则第二个语句可以写成:

    valid_records = filter records by isValid(temperature);

    这种写法更容易理解,也更容易在多个地方重用。接下来的问题就是如何定义这个isValid函数。代码如下:

    packagecom.oserp.pigudf;

    importjava.io.IOException;

    importorg.apache.pig.FilterFunc;

    importorg.apache.pig.data.Tuple;

     

    public class IsValidTemperature extends FilterFunc {

             @Override

             public Boolean exec(Tuple tuple) throws IOException {            

                       Object object = tuple.get(0);

                       int temperature = (Integer)object;            

                       return temperature != 999;

             }

    }

    接下来,我们需要:

    1)  编译代码并打包成jar文件,比如pigudf.jar。

    2)  通过register命令将这个jar文件注册到pig环境:

    register/home/user/hadoop_jar/pigudf.jar //参数为jar文件的本地路径

    此时,我们就可以用以下语句调用这个函数:

    valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

    dump valid_records;

    看起来这个函数名太长,不便输入。我们可以用定义别名的方式代替:

    define isValid com.oserp.pigudf.IsValidTemperature();

    valid_records = filter records by isValid(temperature);

    dump valid_records;

    回到代码,我们可发现:

    1)  需要定义一个继承自FilterFunc的类。

    2)  重写这个类的exec方法。这个方法的参数只有一个tuple,但是调用时可以传递多个参数,你可以通过索引号获得对应的参数值,比如tuple.get(1)表示取第二个参数。

    3)  调用时,需要使用类的全名。(当然你可以自定义别名)

    4)  更多的验证需要读者自行在函数中添加,比如判断是否为null等等。

    备注:用Eclipse编写Pig自定义函数时,你可能需要引用到一些Hadoop的库文件。比较容易的方式是在新建项目时指定项目类型为MapReduce项目,这样Eclipse就会自动设置库引用的相关信息。

    自定义运算函数(Eval function)

    仍然以前面的数据文件为例:

    1990 21

    1990 18

    1991 21

    1992 30

    1992 999

    1990 23

    假设我们希望通过温度值获得一个温度的分类信息,比如我们把温度大于划分为以下类型:

    温度                            分类

    x>=30                          hot

    x>=10 and x<30        moderate

    x<10                                      cool

    则我们可以定义以下函数,代码如下:

    packagecom.oserp.pigudf;

    importjava.io.IOException;

    importorg.apache.pig.EvalFunc;

    importorg.apache.pig.data.Tuple;

     

    public class GetClassification extends EvalFunc<String> {

             @Override

             public String exec(Tuple tuple) throws IOException {               

                       Object object = tuple.get(0);

                       int temperature = (Integer)object;

                      

                       if (temperature >= 30){

                                return "Hot";

                       }

                       else if(temperature >=10){

                                return "Moderate";

                       }

                       else {

                                return "Cool";

                       }                

             }

    }

    依次输入以下Pig语句:

    records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);

    register /home/user/hadoop_jar/pigudf.jar;

    valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

    result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);

    dump result;

    输出结果如下:

    (1990,Moderate)

    (1990,Moderate)

    (1991,Moderate)

    (1992,Hot)    

    (1990,Moderate)

    代码比较简单,该类继承自EvalFunc类,且我们要明确定义返回值类型。

    有些时候其它类库可能包含有功能相近的Java函数,我们是否可以直接将这些库函数拿过来使用呢?可以。以下语句调用了trim函数,用于去掉name字段前后的空格:

    DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');

    B = FOREACH A GENERATE trim(name);

    其中的InvokeForString是一个Invoker(不知道该如何翻译啊),其通过反射机制调用,返回值是String类型。其它类似的还有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。

    自定义加载函数

    我们以词频统计为例,讲解如何自定义加载函数。(统计各个单词出现的频率,由高到低排序)

    一般情况下,load语句加载数据时,一行会被生成一个tuple。而统计词频时,我们希望每个单词生成一个tuple。我们的测试数据文件只有两行数据,如下:

    Thisis a map a reduce program

    mapreduce partition combiner

    我们希望load后能得到如下形式的数据,每个单词一个tuple:

    (This)

    (is)

    (a)

    (map)

    (a)

    (reduce)

     

    先看代码:

    package com.oserp.pigudf;

    import java.io.IOException;

    import java.util.ArrayList;

    import java.util.List;

    import org.apache.hadoop.io.Text;

    importorg.apache.hadoop.mapreduce.InputFormat;

    import org.apache.hadoop.mapreduce.Job;

    importorg.apache.hadoop.mapreduce.RecordReader;

    importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

    import org.apache.pig.LoadFunc;

    importorg.apache.pig.backend.executionengine.ExecException;

    importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

    import org.apache.pig.data.BagFactory;

    import org.apache.pig.data.DataBag;

    import org.apache.pig.data.Tuple;

    import org.apache.pig.data.TupleFactory;

     

     

    public class WordCountLoadFunc extends LoadFunc {

            

             private RecordReader reader;

             TupleFactorytupleFactory = TupleFactory.getInstance();

             BagFactorybagFactory = BagFactory.getInstance();

            

             @Override        

             public InputFormatgetInputFormat() throws IOException {

                       return new TextInputFormat();

             }      

     

             @Override

             public Tuple getNext() throws IOException {

                        

                       try {

                                // 当读取到分区数据块的末尾时,返回null表示数据已读取完

                                if (!reader.nextKeyValue()){

                                         return null;

                                         }

                                Textvalue = (Text)reader.getCurrentValue();

                                Stringline = value.toString();

                                String[]words =  line.split("\s+"); // 断词

                               

                                // 因为getNext函数只能返回一个tuple,

                                // 而我们希望每个单词一个单独的tuple,

                                // 所以我们将多个tuple放到一个bag里面,

                                // 然后返回一个包含一个bag的tuple。

                                // 注:这只是一个用于演示用法的示例,实际中这样使用不一定合理。

                                List<Tuple>tuples = new ArrayList<Tuple>();                    

                                Tupletuple = null;

                                for (String word : words) {

                                         tuple= tupleFactory.newTuple();

                                         tuple.append(word);

                                         tuples.add(tuple);

                                         }

                               

                                DataBagbag = bagFactory.newDefaultBag(tuples);

                                Tupleresult = tupleFactory.newTuple(bag);

                               

                                return result;

                       }

                       catch (InterruptedException e) {

                                throw new ExecException(e);

                       }

                      

             }

     

             @Override

             public void prepareToRead(RecordReader reader,PigSplit arg1)

                                throws IOException {

                       this.reader = reader;

             }

     

             @Override

             public void setLocation(String location, Job job) throws IOException {

                       FileInputFormat.setInputPaths(job,location);          

             }

     

    }

    依次执行以下命令:

    1)       records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});

    2)       flatten_records= foreach records generate flatten($0);

    3)       grouped_records= group flatten_records by words::w;

    4)       result= foreach grouped_records generate group,COUNT(flatten_records);

    5)       final_result= order result by $1 desc,$0;

    6)       dumpfinal_result;

    显示结果如下:

    (a,2)                  

    (map,2)            

    (reduce,2)        

    (This,1)             

    (combiner,1)   

    (is,1)                           

    (partition,1)    

    (program,1)     

    注意schema的定义格式:(words:bag{word:(w:chararray)})

  • 相关阅读:
    eclipse maven POM 第一行提示异常Could not initialize class org.apache.maven.plugin.war.util.WebappStructureSerializer
    Tomcat搭建HTTP文件下载服务器
    Hui datetimepicker 插件不显示左右箭头问题
    【Python】拷贝&列表
    【Python】Python赋值引用、浅拷贝、深拷贝
    【Python】列表的复制
    【Python】电脑安装了多个版本Python,如何切换;
    【Python】定义类、创建类实例
    【Python】白名单方式净化
    c++ 对外提供头文件的模式
  • 原文地址:https://www.cnblogs.com/hadoop-dev/p/5913864.html
Copyright © 2011-2022 走看看