zoukankan      html  css  js  c++  java
  • pig基础知识总结

    Pig Latin UDF语句

    REGISTER   在Pig运行时环境中注册一个JAR文件

    DEFINE      为UDF、流式脚本或命令规范新建别名

     

    Pig Latin命令类型

    kill    中止某个MapReduce任务

    exec 在一个新的Grunt shell程序中以批处理模式运行一个脚本

    run   在当前Grunt外壳程序中运行程序

    quit  退出解释器

    set   设置Pig选项

     

    模式(Schema

    Pig的一个关系可以有一个关联的模式,模式为关系的字段指定名称和类型。Pig的这种模式声明方式和SQL数据库要求数据加载前必须先声明模式截然不同,Pig设计的目的是用于分析不包含数据类型信息的纯文本输入文件的。但是尽量定义模式,会让程序运行地更高效。

    缺点:在查询中声明模式的方式是灵活的,但不利于模式重用。每个查询中维护重复出现的模式会很困难。处理这一问题的办法是写自己的加载函数来封装模式。

    SQL数据库在加载数据时,会强制检查表模式中的约束。在pig中,如果一个值无法被强制转换为模式中申明的类型,pig会用空值null代替,显示一个空位。大数据集普遍都有被损坏的值、无效值或意料之外的值,简单的方法是过滤掉无效值:

    grunt>good_records = filter recordsby temperature is not null;

    另一种技巧是使用SPLIT操作把数据划分成好和坏两个关系,然后在分别进行分析:

    grunt> split records intogood_records if temperature is not null,

                                       bad_records if temperature isnull;

    grunt> dump good_records;

    在Pig中,不用为数据流中的每个新产生的关系声明模式。大多数情况下,Pig能够根据关系操作的输入关系的模式来确定输出结果的模式。有些操作不改变模式,如Limit。而Union会自动生成新的模式。

    重新定义一个关系的模式用带as子句的FOREACH…GENERATE操作来定义输入关系的一部分或全部字段的模式

    Pig的函数分为计算函数,过滤函数,加载函数和存储函数。

    计算函数: AVG, COUNT, CONCAT, COUNTSTAR,DIFF, MAX, MIN, SIZE, SUM, TOKENIZE

    过滤函数:IsEmpty

    加载/存储函数:PigStorage,BinStorage, BinaryStorage, TextLoader, PigDump

     

    内连接的实现:

    COGROUP,inner和flatten组合使用相当于实现了内连接:

    G = COGROUP A by $0 innner, B by $1inner;

    H = foreach G generate flatten($1),flatten($2)

    // H和join A by $0, B by $1相同

     

     

     

    foreach…generate…
    用于重新生成模式中的数据名和数据类型。默认保持原有名字,但是如果存在表达式,则无数据名,只有数据类型;而两个不兼容类型计算后,生成数据类型unknown,必须在generate 变量后使用as关键词定义别名;

          

     

    #用于map查找;

    .用于tuple(元组)投影;

    +对于bag不适用;

    下面的语句将执行不了

    A= load 'foo'as (x:chararray, y:int, z:int);

    B = group A by x; -- produces bag Acontaining all the records for a given value ofx

    C = foreach B generate SUM(A.y + A.z);

    因为A.y A.z都是bag,符号+对于bag不适用。

    正确的做法如下

    A= load 'foo'as (x:chararray, y:int, z:int);

    A1 = foreach A generate x, y + z as yz;

    B = group A1 by x;

    C = foreach B generate SUM(A1.yz);

     

     

     

    GROUP

    Group后返回一个包结构,输出两个字段:key和包含聚集记录的bag。Key的别名为group,bag名为操作表名。

    group Table all 是将所有数据归为一组,名为all{所有数据}

     

     

    按多个字段group,就是按一个tuple,如group table by(a1,a2)。则返回结构中,key为“group:(a1:int,a2:int)”,而bag名为操作表名。这里key是按a1和a2组合值进行分组,实际存在的组合值,进行归并。不是先按a1,再按a2。

    文档a.txt:

    a 12 3 4.2 9.8

    a 30 5 3.5 2.1

    b 79 9 - -

    a 79 9 2.6 6.2

    a 12 5 7.7 5.9

    a 12 3 1.4 0.2

    进行pig

    A =LOAD'a.txt'AS(col1:chararray, col2:int, col3:int,col4:int, col5:double, col6:double);

    B =GROUPA BY(col2, col3, col4);

    C =FOREACH B GENERATE group, AVG(A.col5), AVG(A.col6);

    DUMPC;

    ((1,2,3),2.8,5.0)

    ((1,2,5),7.7,5.9)

    ((3,0,5),3.5,2.1)

    ((7,9,9),2.6,6.2)

    按照A的第234列,对A进行分组。pig会找出所有第234列的组合,并按照升序进行排列,然后将它们与对应的包A整合起来,得到如下的数据结构:

    1

    B: {group: (col2: int,col3: int,col4: int),A: {col1: chararray,col2: int,col3: int,col4: int,col5: double,col6: double}}

    组合(123)对应了两行数据,组合(799)也对应了两行数据:

    1

    2

    3

    4

    ((1,2,3),{(a,1,2,3,4.2,9.8),(a,1,2,3,1.4,0.2)})

    ((1,2,5),{(a,1,2,5,7.7,5.9)})

    ((3,0,5),{(a,3,0,5,3.5,2.1)})

    ((7,9,9),{(b,7,9,9,,),(a,7,9,9,2.6,6.2)})

     

     

     

    Group all就是整体分成一个组,bag名为all

    --double_distinct.pig

    divs = load 'NYSE_dividends'as(exchange:chararray, symbol:chararray);

    grpd = group divs all;

    uniq = foreach grpd{

               exchanges      = divs.exchange;

               uniq_exchanges = distinct exchanges;

               symbols        = divs.symbol;

               uniq_symbols   = distinct symbols;

               generate COUNT(uniq_exchanges), COUNT(uniq_symbols);

    };

    注意:按all打包,作用于包grpd中每一行,是用的divs的去重,统计出一个数,放入包中。因为all包中已经不能用了

     

    所以,要count当前表的列数,就需要,先t2=group table1 all, 然后在进行foreach t2generate count(table1.col).注意,此时的table1是bag名字,不是表名。

    数列数,要先group新表,然后用用bag名字统计。

    cogroup:
    按多个关系中的字段进行分组。分别打包成bag,共同组成一个keygroup
    按指定的共有列,分别各自组成一个bag,放到一个group组中。C = cogroup A by id, B by id;那么,id的每个值作为group的key,其后是A,B各自的bag,保存对应此key的所有记录,各自的一个bag保存对应key的所有记录,每个记录是一个元组,两个bag作为key的group。

    注意:key值为null的数据都被归为同一类,这一点和group相同,和join不同。

    假设有以下两个数据文件:

    01

    02

    03

    04

    05

    06

    07

    08

    09

    10

    [root@localhost pig]$ cat a.txt

    uidk  12  3

    hfd 132 99

    bbN 463 231

    UFD 13  10

     

    [root@localhost pig]$ cat b.txt

    908 uidk  888

    345 hfd 557

    28790 re  00000

    现在我们用pig做如下操作及得到的结果为:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    grunt> A = LOAD 'a.txt' AS (acol1:chararray, acol2:int, acol3:int);

    grunt> B = LOAD 'b.txt' AS (bcol1:int, bcol2:chararray, bcol3:int);

    grunt> C = COGROUP A BY acol1, B BY bcol2;

    grunt> DUMP C;

    (re,{},{(28790,re,0)})

    (UFD,{(UFD,13,10)},{})

    (bbN,{(bbN,463,231)},{})

    (hfd,{(hfd,132,99)},{(345,hfd,557)})

    (uidk,{(uidk,12,3)},{(908,uidk,888)})

     

     

     

    join

    join是等值连接,即自然连接。Pig只支持等值连接,非等值连接用cross。

    daily = load 'NYSE_daily'as(exchange, symbol, date, open, high, low, close,

               volume, adj_close);

    divs  =load 'NYSE_dividends' as (exchange, symbol, date, dividends);

    jnd   =join daily by (symbol, date), divs by (symbol, date);

    连接后,每个表各自的属性列,用”::”指定,如daily::symbol。非共有属性可省略。

     

    外连接,outer join:

    分 left,right,full,共3种。用null补充缺失字段。

    左外连接左边的数据全部保留,其他填充null

    右外连接右边的数据全部保留,其他填充null。

    全外连接Full out join,两边全保留。Full关键字不可省略。

    自身和自身join,要加载两次

    --leftjoin.pig

    daily = load 'NYSE_daily'as(exchange, symbol, date, open, high, low, close,

               volume, adj_close);

    divs  =load 'NYSE_dividends' as (exchange, symbol, date, dividends);

    jnd   =join daily by (symbol, date) left outer, divs by (symbol, date);

    自身加载两次:

    --selfjoin.pig

    -- For each stock, find all dividends thatincreased between two dates

    divs1    = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,

                    date:chararray, dividends);

    divs2    = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,

                    date:chararray, dividends);

    jnd      = join divs1 by symbol, divs2 by symbol;

    increased = filter jnd by divs1::date <divs2::date and

                    divs1::dividends <divs2::dividends;

     

     

     

    数据多的表(或叫关系),放右边!!降低内存使用率,调高效率。

    JOIN用到的key所对应的记录最多的关系(relation)排在最后,如:

    D = JOIN A BY col1, B BY col1, C BY col1;

    因为,pig进行join过程:map阶段标记每个记录的map,按key进行shuffle,相同值收集到一起,按map标记排序,左边输入数据先放入缓存,然后右边进来后,每一条去交叉运算,逐条比对。生成一条。

     

     

    Pig函数使用:

    自身udf注册才能用。引用加包名和函数

    register 'your_path_to_piggybank/piggybank.jar';

    divs     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,

                    date:chararray, dividends:float);

    backwards = foreach divsgenerate

                    org.apache.pig.piggybank.evaluation.string.Reverse(symbol);

     

    define 定义别名,调用方便

    --define_constructor_args.pig

    register 'acme.jar';

    define convertcom.acme.financial.CurrencyConverter('dollar', 'euro');

    divs     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,

                    date:chararray, dividends:float);

    backwards = foreach divsgenerate convert(dividends);

     

     

     

    调用python的UDF,必须声明using jython(用这个编译),并给定一个命名空间 as ball。即

    Register ‘fuction.py’using jython as ball

    调用的时候用ball.fuction(int param1,floatparam2)

    这里,jython.jar解释器所在路径,必须提前放到类路径中,设置PIG_CLASSPATH指定。

     

    Java函数调用:

    Pig正常调用java函数,也是注册后,define别名调用,同上。如果函数有形参,在define时需要声明,叫构造函数参数,特殊的是define时不近给出形参,还要给出输出类型。多个形参在调查中?

     

    静态java函数,pig用invoker调用。有无参数都可,但pig不支持函数返回值重载,因为每种类型有一个对应调用方法,InvokeForInt、InvokeForLong、InvokeForFloat、InvokeForDouble、InvokeForString

    需要两个构造参数,一个完整包名,类名,函数名;另一个是参数。

    托管java中的静态函数(效率较低)

    --invoker.pig

    define hexInvokeForString('java.lang.Integer.toHexString', 'int');

    divs  =load 'NYSE_daily' as (exchange, symbol, date, open, high, low,

               close, volume, adj_close);

    nonnull = filter divs by volume is not null;

    inhex = foreach nonnull generate symbol,hex((int)volume);

    如果函数的参数是一个数组,那么传递过去的是一个bag

     

    define stdevInvokeForDouble('com.acme.Stats.stdev', 'double[]');

    A = load 'input' as (id: int, dp:double);

    B = group A by id;

    C = foreach B generate group, stdev(A.dp);

     

     

     

     

     

    Pig 正则表达式:

    A =LOAD'a.txt'AS(col1: int, col2: chararray);

    B =FILTER A BYcol2 matches '.*//.*\.qq\.com/.*'; 

    DUMPB;

    Matches col2正则匹配,使用ava格式的正则表达式匹配规则。

    表示任意字符,表示字符出现任意次数; 进行了转义,表示匹配 这个字符;就是表示匹配 这个字符。

    注意,引号中转义字符  需要两个才能表示, \. 就是正则 ,即匹配 字符。所以匹配数字应该用这种写法:d匹配数字,用\d),B = FILTER A BY (col matches '\d.*');

     

    Pig中map数量通过设置输入文件大小最小值,来限制

    如:set mapred.min.split.size 2147483648; set的参数的单位是字节,所以2G=2*1024*1024*1024=2147483648。小于2G的文件将被作为一个split输入,从而一个小于2G的文件将只有一个map。假设Pig job是一个纯mapjob输出数量会明显减少。


  • 相关阅读:
    常用颜色
    在VS2010中打开VS2012的项目
    vs2012 断点不能调试
    Setup Factory 打包.netframework 2.0
    Access 中数据库操作时提示from子句语法错误
    vs2012 .netFramwork2.0发布到xp
    c# access插入null值
    Visual Studio安装卸载模板
    Codeforces 455D
    ACdream 1157 (cdq分治)
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205441.html
Copyright © 2011-2022 走看看