zoukankan      html  css  js  c++  java
  • Pig系统分析(8)-Pig可扩展性

    本文是Pig系统分析系列中的最后一篇了,主要讨论怎样扩展Pig功能。不仅介绍Pig本身提供的UDFs扩展机制,还从架构上探讨Pig扩展可能性。

    补充说明:前些天同事发现twitter推动的Pig On Spark项目:Spork,准备研究下。

    UDFs

    通过UDFs(用户自己定义函数),能够自己定义数据处理方法,扩展Pig功能。实际上,UDFS除了使用之前须要register/define外。和内置函数没什么不同。

    主要的EvalFunc

    以内置的ABS函数为例:

    public class ABS extends EvalFunc<Double>{
        /**
         * java level API
         * @param input expectsa single numeric value
         * @return output returns a single numeric value, absolute value of the argument
         */
        public Double exec(Tuple input) throws IOException {
            if (input == null || input.size() == 0)
                return null;
     
            Double d;
            try{
                d = DataType.toDouble(input.get(0));
            } catch (NumberFormatException nfe){
                System.err.println("Failed to process input; error -" + nfe.getMessage());
                return null;
            } catch (Exception e){
                throw new IOException("Caught exception processing input row", e);
            }
            return Math.abs(d);
        }
        ……
        public Schema outputSchema(Schema input) ;
        public List<FuncSpec> getArgToFuncMapping() throws FrontendException;
     
    }
    1. 函数都继承EvalFunc接口,泛型參数Double代表返回类型。

    2. exec方法:输入參数类型为元组,代表一行记录。
    3. outputSchema方法:用于处理输入和输出Schema
    4. getArgToFuncMapping:用于支持各种数据类型重载。

    聚合函数

    EvalFuc方法也能实现聚合函数,这是由于group操作对每一个分组都返回一条记录,每组中包括一个Bag,所以exec方法中迭代处理Bag中记录就可以。

    以Count函数为例:

    public Long exec(Tuple input) throws IOException {
        try {
            DataBag bag = (DataBag)input.get(0);
            if(bag==null)
                return null;
            Iterator it = bag.iterator();
            long cnt = 0;
            while (it.hasNext()){
                Tuple t = (Tuple)it.next();
                if (t != null && t.size() > 0 && t.get(0) != null )
                    cnt++;
            }
            return cnt;
        } catch (ExecException ee) {
            throw ee;
        } catch (Exception e) {
            int errCode = 2106;               
            String msg = "Error while computing count in " + this.getClass().getSimpleName();
            throw new ExecException(msg, errCode, PigException.BUG, e);
        }
    }

    Algebraic 和Accumulator 接口

    如前所述,具备algebraic性质的聚合函数在Map-Reduce过程中能被Combiner优化。直观来理解,具备algebraic性质的函数处理过程能被分为三部分:initial(初始化,处理部分输入数据)、intermediate(中间过程,处理初始化过程的结果)和final(收尾,处理中间过程的结果)。

    比方COUNT函数,初始化过程为count计数操作。中间过程和收尾为sum求和操作。更进一步。假设函数在这三个阶段中都能进行同样的操作,那么函数具备distributive性质。比方SUM函数。

    Pig提供了Algebraic 接口:

    public interface Algebraic{
        /**
         * Get the initial function.
         * @return A function name of f_init. f_init shouldbe an eval func.
         * The return type off_init.exec() has to be Tuple
         */
        public String getInitial();
     
        /**
         * Get the intermediatefunction.
         * @return A function name of f_intermed. f_intermedshould be an eval func.
         * The return type off_intermed.exec() has to be Tuple
         */
        public String getIntermed();
     
        /**
         * Get the final function.
         * @return A function name of f_final. f_final shouldbe an eval func parametrized by
         * the same datum as the evalfunc implementing this interface.
         */
        public String getFinal();
    }

    当中每一个方法都返回EvalFunc实现类的名称。

    继续以COUNT函数为例,COUNT实现了Algebraic接口。针对下面语句:

    input= load 'data' as (x, y);
    grpd= group input by x;
    cnt= foreach grpd generate group, COUNT(input);
    storecnt into 'result';
    Pig会重写MR运行计划:

    Map
    load,foreach(group,COUNT.Initial)
    Combine
    foreach(group,COUNT.Intermediate)
    Reduce
    foreach(group,COUNT.Final),store
    
    Algebraic 接口通过Combiner优化降低传输数据量,而Accumulator接口则关注的是内存使用量。UDF实现Accumulator接口后,Pig保证全部key相同的数据(通过Shuffle)以增量的形式传递给UDF(默认pig.accumulative.batchsize=20000)。相同。COUNT也实现了Accumulator接口。

    /* Accumulator interface implementation */
        private long intermediateCount = 0L;
        @Override
        public void accumulate(Tuple b) throws IOException {
           try {
               DataBag bag = (DataBag)b.get(0);
               Iterator it = bag.iterator();
               while (it.hasNext()){
                    Tuple t = (Tuple)it.next();
                    if (t != null && t.size() > 0 && t.get(0) != null) {
                        intermediateCount += 1;
                    }
                }
           } catch (ExecException ee) {
               throw ee;
           } catch (Exception e) {
               int errCode = 2106;
               String msg = "Error while computing min in " + this.getClass().getSimpleName();
               throw new ExecException(msg, errCode, PigException.BUG, e);          
           }
        }
     
        @Override
        public void cleanup() {
           intermediateCount = 0L;
        }
        @Override
        /*
        *当前key都被处理完之后被调用
        */
        public Long getValue() {
           return intermediateCount;
        }

    前后端数据传递

    通过UDFs构造函数传递数据是最简单的方法。然后通过define语句定义UDF实例时指定构造方法參数。但有些情况下。比方数据在执行期才产生,或者数据不能用String格式表达,这时候就得使用UDFContext了。

    UDF通过getUDFContext方法获取保存在ThreadLoacl中的UDFContext实例。

    UDFContext包括下面信息:

    1. jconf:Hadoop Configuration。

    2. clientSysProps:系统属性。
    3. HashMap<UDFContextKey,Properties> udfConfs:用户自己保存的属性,当中UDFContextKey由UDF类名生成。

    UDFs运行流程


    Pig架构可扩展性

    Pig哲学之三——Pigs Live Anywhere。

    理论上。Pig并不被限定执行在Hadoop框架上,有几个能够參考的实现和提议。

    1. Pigen。Pig on Tez。https://github.com/achalsoni81/pigeon,架构图例如以下:
    2. Pig的后端抽象层:https://wiki.apache.org/pig/PigAbstractionLayer

      眼下已经实现了PigLatin执行在Galago上。

      http://www.galagosearch.org/

    參考资料

    Pig官网:http://pig.apache.org/

    Pig paper at SIGMOD 2008:Building a High Level DataflowSystem on top of MapReduce:The Pig Experience

    Programming.Pig:Dataflow.Scripting.with.Hadoop(2011.9).Alan.Gates

  • 相关阅读:
    360网盘书籍分享
    oracle11g字符集问题之一
    order by 的列名不能参数化,要拼sql
    oracle11g的冷热备份
    Spring 事务管理的使用
    Spring 事务管理的API
    事务总结
    Excel 单元格中内容的换行
    手动配置IP地址
    MyBatis 三剑客
  • 原文地址:https://www.cnblogs.com/wzzkaifa/p/6805110.html
Copyright © 2011-2022 走看看