zoukankan      html  css  js  c++  java
  • [db] hadoop && pig

    hadoop && pig

    Hadoop

    最近需要用到hadoop操作,发现hadoop的官网真的良心的,没那么多废话,直接说明白怎么用,而且还是中文的,简单粗暴啊!!!

    hadoop document

    在mapreduce中,map的输出是具有自动排序功能的!!!

    在reduce阶段 ,0-33%阶段是 shuffle 阶段,就是根据键值来将本条记录发送到指定的reducer,这个阶段应该是在map还没有完全完成的时候就已经开始了,提高了程序的执行效率。34%-65%阶段是sort阶段,就是reduce根据收到的键值进行排序。map阶段也会发生排序,map的输出结果是以键值为顺序排序后输出,可以通过只有map阶段处理的输出来验证。66%-100%阶段是处理阶段,这个阶段才是真正的处理阶段,如果程序卡在66%,那就是reduce程序有问题了。

    pig

    另外还有一个pig语言,是一种过程语言,类似于mysql(这玩意也不熟~尴尬)。
    总结下pig的用法,顺带实验例子。

    这个网站也不错

    pig数据类型

    • double | float | long | int | chararry
      | bytearray
    • tuple | bag | map
      tuple类似matlab cell,元素类型可以不同('haha',1)
      bag相当于tuple的集合,用{}表示,{('haha',1),('hehe',2)}
      field是列数据标识
      map相当于hash表,key为chararray,value为任意类型

    运行和注释

    运行:
    本地模式:pig -x local
    集群模式:pig -x mapreduce 或者 pig
    批处理pig文件,上两行命令后接pig文件名,pig xx.pig
    注释:
    行注释 --
    段注释 /**/
    

    pig latin

    >> cat 1.txt
    a 1 2 3 4.2 9.8
    a 3 0 5 3.5 2.1
    b 7 9 9 - -
    a 7 9 9 2.6 6.2
    a 1 2 5 7.7 5.9
    a 1 2 3 1.4 0.2
    

    []代表可选项
    pig命令不区分大小写

    LOAD & Schema

    load = LOAD 'data_path' [USING function] [AS schema]
    A = LOAD '1.txt' USING PigStorage(' ') AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
    

    将1.txt的每行数据用' '分割,分装到对应的col1,col2的列名进行数据解析,如果没有指定,可以用$0 $n来索引。

    pig默认PigStorage()读入本地磁盘或者hadoop路径数据,org.apache.hcatalog.pig.HCatLoader()读取hive表;()中的是分隔符。

    Pig的schema运行我们指定relation为特定的结构,为字段指定名称和类型。如果省略类型信息,则默认为bytearray,也可以完全不指定schema。

    但是不指定SCHEMA有时候会出问题,所以建议在load时写上,实在用不到的话用A,B,C...啥的随便写一些就好

    describe A;
    out:
    A: {col1: chararray,col2: int,col3: int,col4: int,col5: double,col6: double}
    A = LOAD '1.txt' USING PigStorage(' ') AS (col1, col2, col3, col4, col5, col6);
    out:
    A: {col1: bytearray,col2: bytearray,col3: bytearray,col4: bytearray,col5: bytearray,col6: bytearray}
    A = LOAD '1.txt' USING PigStorage(' ');
    out:
    Schema for A unknown.
    
    hive

    Hive的组件HCatalog为解决这个问题提供了一种方案,通过提供Hive metastore的访问接口,Pig查询可以通过名字来引用Schema。

    pig -useHCatalog
    records = LOAD "records" USING org.apache.hcatalog.pig.HCatLoader();
    
    

    GROUP

    B = GROUP A BY (col2, col3, col4);
    out:
    ((1,2,3),{(a,1,2,3,1.4,0.2),(a,1,2,3,4.2,9.8)})
    ((1,2,5),{(a,1,2,5,7.7,5.9)})
    ((3,0,5),{(a,3,0,5,3.5,2.1)})
    ((7,9,9),{(a,7,9,9,2.6,6.2),(b,7,9,9,,)})
    

    用(col2, col3, col4)对A分组,然后按组将每条tuple汇聚成一个bag.B:{group:(col2,col3,col4),A:bag:{tuple,tuple}}

    group分组操作,将数据分组为group_col:bag,第一个字段被命名为'group',第二个字段是bag,包含'group'对应的值的所有tuple集合。

    FOREACH

    C = FOREACH B GENERATE group, AVG(A.col5), AVG(A.col6);
    out:
    ((1,2,3),2.8,5.0)
    ((1,2,5),7.7,5.9)
    ((3,0,5),3.5,2.1)
    ((7,9,9),2.6,6.2)
    其中(1.4+4,2)/2=2.8
    

    foreach是遍历每个组中的tuple,并对其进行处理。

    AVG(平均),COUNT(计数),MIN,MAX基本和excel的缩写一致。

    C:{group:(col2,col3,col4),double,double}.

    一般foreach和generate是一块使用的,在数据较大时,建议尽早使用foreach generate过滤掉多余信息以减少数据交换。

    FILTER

    d = filter A by $0 == 'a';
    out:
    (a,1,2,3,4.2,9.8)
    (a,3,0,5,3.5,2.1)
    (a,7,9,9,2.6,6.2)
    (a,1,2,5,7.7,5.9)
    (a,1,2,3,1.4,0.2)
    

    按照要求过滤数据,可以使用and or连接多个条件,滤除无用信息null 0 -1...

    CONCAT & SUBSTRING

    B = FOREACH A GENERATE CONCAT($0, (chararray)$1,(chararray)$2,(chararray)$3);
    out:
    (a123)
    (a305)
    (b799)
    (a799)
    (a125)
    (a123)
    
    C = foreach B generate (chararray)SUBSTRING($0,0,2);
    out:
    (a1)
    (a3)
    (b7)
    (a7)
    (a1)
    (a1)
    

    concat拼接两字符串,substring按长度截取字符串[0,2)左闭右开区间。

    order

    c = ORDER b BY cnt ASC;
    ASC升序	DESC降序
    

    显示和存储

    DUMP C;
    STORE C INTO 'output_path'
    

    DUMP显示,上边所有的out部分。store是存储。output_path必须是不存在的路径,pig自己新建。

    JOIN,UNION,COGROUP,CROSS

    a.txt:
    (2,Tie)
    (4,Coat)
    (3,Hat)
    (1,Scarf)
    b.txt:
    (Joe,2)
    (Hank,4)
    (Ali,0)
    (Eve,3)
    (Hank,2)
    A = LOAD 'a.txt' USING PigStorage(',');
    B = LOAD 'b.txt' USING PigStorage(',');
    

    JOIN

    C = JOIN A BY $0, B BY $1;
    out:
    (2,Tie,Hank,2)
    (2,Tie,Joe,2)
    (3,Hat,Eve,3)
    (4,Coat,Hank,4)
    

    根据key得到行加入。inner join,一般用A小表join B大表,起到部分过滤的作用。

    还有一个left join: left outer

    UNION

    D = UNION A, B;
    out:
    (Joe,2)
    (Hank,4)
    (Ali,0)
    (Eve,3)
    (Hank,2)
    (2,Tie)
    (4,Coat)
    (3,Hat)
    (1,Scarf)
    

    可以对不同字段数的数据集进行union操作。

    COGROUP

    E = COGROUP A BY $0, B BY $1;
    E = COGROUP A BY $0, B BY $1 outer;
    out:
    (0,{},{(Ali,0)})
    (1,{(1,Scarf)},{})
    (2,{(2,Tie)},{(Hank,2),(Joe,2)})
    (3,{(3,Hat)},{(Eve,3)})
    (4,{(4,Coat)},{(Hank,4)})
    F = COGROUP A BY $0 inner, B BY $1;
    out:
    (1,{(1,Scarf)},{})
    (2,{(2,Tie)},{(Hank,2),(Joe,2)})
    (3,{(3,Hat)},{(Eve,3)})
    (4,{(4,Coat)},{(Hank,4)})
    

    输出一组嵌套的tuple结构。COGROUP为每个不同的key生成一个tuple。每个tuple的第一个字段就是key。其他字段是各个关系中匹配该键值的元组所组成的 bag。第一个bag中是A中的匹配tuple,第二个bag是B的,没有匹配的则为空{}。
    COGROUP的默认类型outer连接。

    CROSS

    F = CROSS A, B;
    out:
    (1,Scarf,Hank,2)
    (1,Scarf,Eve,3)
    (1,Scarf,Ali,0)
    (1,Scarf,Hank,4)
    (1,Scarf,Joe,2)
    (3,Hat,Hank,2)
    (3,Hat,Eve,3)
    (3,Hat,Ali,0)
    (3,Hat,Hank,4)
    (3,Hat,Joe,2)
    (4,Coat,Hank,2)
    (4,Coat,Eve,3)
    (4,Coat,Ali,0)
    (4,Coat,Hank,4)
    (4,Coat,Joe,2)
    (2,Tie,Hank,2)
    (2,Tie,Eve,3)
    (2,Tie,Ali,0)
    (2,Tie,Hank,4)
    (2,Tie,Joe,2)
    

    CROSS 笛卡尔积。会将第一个关系中的每个元组和第二中的所有元组进行连接。这个操作的输出结果的大小是输入关系的大小的乘积。

    功能code块

    1)两张数据表中某个字段的交集、差集

    a.txt
    uidk,12,3
    hfd,132,99
    bbN,463,231
    UFD,13,10
    b.txt
    908,uidk,888
    345,hfd,557
    28790,re,00000
    A = LOAD 'a.txt' USING PigStorage(',') AS (acol1:chararray, acol2:int, acol3:int);
    B = LOAD 'b.txt' USING PigStorage(',') AS (bcol1:int, bcol2:chararray, bcol3:int);
    C = COGROUP A BY acol1, B BY bcol2;
    
    out:
    (re,{},{(28790,re,0)})
    (UFD,{(UFD,13,10)},{})
    (bbN,{(bbN,463,231)},{})
    (hfd,{(hfd,132,99)},{(345,hfd,557)})
    (uidk,{(uidk,12,3)},{(908,uidk,888)})
    
    DESCRIBE C;
    out:
    C: {group: chararray,A: {(acol1: chararray,acol2: int,acol3: int)},B: {(bcol1: int,bcol2: chararray,bcol3: int)}}
    
    D = filter C by NOT IsEmpty(A) ;
    D = filter C by IsEmpty(B);
    out:
    (UFD,{(UFD,13,10)},{})
    (bbN,{(bbN,463,231)},{})
    
    D = filter C by NOT IsEmpty(A) AND NOT IsEmpty(B);
    (hfd,{(hfd,132,99)},{(345,hfd,557)})
    (uidk,{(uidk,12,3)},{(908,uidk,888)})
    

    2)对表中某个字段的字符串切分

    b.txt
    908,uidk-haha,888
    345,hfd-hehe,557
    28790,re-hehe,00000
    B = LOAD 'b.txt' USING PigStorage(',') AS (bcol1:int, bcol2:chararray, bcol3:int);
    A = FOREACH B GENERATE FLATTEN(STRSPLIT(bcol2,'-',2)) AS (name,joy);
    out:
    (uidk,haha)
    (hfd,hehe)
    (re,hehe)
    A = FOREACH B GENERATE FLATTEN(STRSPLIT(bcol2,'-',2).$0) AS name;
    

    STRSPLIT('原始字符串', '分隔符(默认空格,特殊字符分割使用反斜杠 )', '限制返回的个数(超出舍弃)')

    1. 使用UDF来hash字段
      多记录一步,在UDF的返回值,如果不是字符串就不要用chararray的类型,因为在大数据量的时候会出问题。
    cat func_hash.py 
    
    @outputSchema('hash_product:int')
    def get_hash(product):
        if len(product) >= 50:
            return 0
        return hash(product)  
    
    pig:
    REGISTER 'hdfs://ns3/hadoop_path/func_hash.py' USING jython AS myUdfs;
    DEFINE tohash myUdfs.get_hash();
    A = FOREACH A   GENERATE 
    		b        AS b,
    		tohash(c)     AS c;
    

    Reference

    https://www.w3cschool.cn/apache_pig/
    https://www.zybuluo.com/BrandonLin/note/449340
    http://blog.csdn.net/bingduanlbd/article/details/52049683
    http://blog.csdn.net/zythy/article/details/18426347
    http://blackproof.iteye.com/blog/1791980
    http://www.360doc.com/content/15/0520/20/13670635_472030452.shtml
    http://blog.csdn.net/gg584741/article/details/51712242
    https://www.codelast.com/原创pig中的一些基础概念总结/
    http://www.aboutyun.com/thread-6713-1-1.html
    https://wenku.baidu.com/view/d530e025c5da50e2524d7fa7.html
    http://blog.csdn.net/duchang110/article/details/17781865

  • 相关阅读:
    链家网各城市二手房价格
    mitmproxy 配置
    Python操作APP -- Appium-Python-Client
    Appium连接模拟器
    adb server version (xx) doesn't match this client (xx); killing...
    Appnium 环境搭建
    KeyError: 'xxx does not support field: _id'
    Python执行JS -- PyExecJS库
    Python -- CSV文件读写
    Git的基本使用 -- 分支管理
  • 原文地址:https://www.cnblogs.com/zhanxiage1994/p/7991992.html
Copyright © 2011-2022 走看看