zoukankan      html  css  js  c++  java
  • Pig基础学习【持续更新中】

    *本文参考了Pig官方文档以及已有的一些博客,并加上了自己的一些知识性的理解。目前正在持续更新中。*

    Pig作为一种处理大规模数据的高级查询语言,底层是转换成MapReduce实现的,可以作为MapReduce的一种脚本语言,大大简化了对数据操作的代码。

    **

    基本概念和用法

    **:
    1. 关系(relation):即包(bag),是一个元组(tuple)的集合。
    可将关系/包想象成Spark中RDD的概念。
    元组()同Spark中元组的概念。
    2. 简单的文件载入操作:

    A = LOAD 'a.txt' AS (col1:chararray, col2:int);
    B = GROUP A BY (col1);
    DUMP A;
    DUMP B;

    上段代码将a.txt中的内容加载到变量A中,A的数据结构为A:{col1: chararray,col2:int},A就是一个包。
    注意:
    1)这里面=左右两边要有空格,否则报错。
    2)load as,dump等不区分大小写。
    3)load as后面括号里col1和col2都是别名,如果不指定别名,后面的代码可以使用0,1…处理,此处使用别名是为了增强可读性。
    3. 包中数据处理操作:

    C=FOREACH B GENERATE group, AVG(A.col2);

    遍历B中元素产生group,col2平均值

    关系操作

    LOAD
    将数据从文件系统中载入。

    LOAD 'data' [USING function] [AS schema]; 

    USING关键字指定loading function,默认是PigStorage,当然也可以使用用户自己定义的UDF。
    示例:

    A = LOAD 'data' AS (f1:int, f2:int);

    注:schema即模式,可以理解元数据,用”()”括起来。

    DISTINCT
    在关系/包中去重。

    alias = DISTINCT alias [PARTITION BY partitioner] [PARALLEL n]; 

    patitioner:MapReduce中Mapper阶段的Partitioner。
    PARALLEL:可以改变job的并行度,改变了reduce task的数目。
    为了去重,数据首先要排好序。’
    示例:

    X = DISTINCT A

    将包/关系A中重复记录去掉,并赋值给新的包/关系X。
    多维度中,计算几个维度组合下的不重复记录数。
    按照指定几个字段来去重:DISTINCT去除整行重复的数据。该命令会触发reduce过程,同时在Mapper阶段也是会使用Combiner做聚合操作的。
    register:注册udf jar包然后脚本中即可使用。
    set:可在pig脚本的开头来设置job的参数,如set mapred.min.s
    plit.size 2147483648; 设置参数的单位是字节。
    设置pig job的名字,在hadoop jobtracker 页面就能很快找到自己的job:set job.name ‘haha’
    输出LZO压缩格式的文本文件,可借助于elephant-bird,两种方式:
    方式1:

    A =LOAD'input';
    STORE A INTO'output'USING com.twitter.elephantbird.pig.store.LzoPigStorage();

    方式2:
    在pig脚本的最前面加上两句话:

    set mapred.output.compression.codec 'com.hadoop.compression.lzo.LzopCodec';
    set mapred.output.compress 'true';

    这种方式在store的时候不用using,会将文件自动保存为LZO格式。
    加载LZO压缩文件:
    A = LOAD 'output' USING com.twitter.elephantbird.pig.store.LzoPigStorage(',');
    指定了分隔符为逗号,如果不想指定,括号内不加即可。

    STREAM
    把数据送到外部脚本或程序运行。

    alias = STREAM alias [, alias …] THROUGH {`command` | cmd_alias } [AS schema] ;

    alias:包
    command:通过反引号 括起来,可以包含参数。
    schema的例子:X = STREAM A THROUGH
    stream.pl` as (f1:int, f2:int, f3:int);
    例如,可以将Python程序嵌入到Pig中使用。
    建立一个Python文件pass.py,示例程序:

    #! /usr/bin/envpython
    import sys
    
    for line insys.stdin:
             (c,n,s) = line.split()  
             if int(s) >= 60:
                       print "%s	%s	%s"%(c,n,s)

    执行以下命令:

    define pass `pass.py` SHIP('/home/user/pass.py');
    records_pass = stream records through pass as(classNo:chararray, studNo:chararray, score:int);
    dump records_pass;

    上面命令中ship命令是将python程序提交到Hadoop集群中。pass后面的pass.py外是反引号。

    FILTER
    基于某种条件从包中选择元组。

    alias = FILTER alias  BY expression;

    expression的返回值是boolean类型,为true则保留该条记录,为false则舍弃该条记录。
    示例:

    X = FILTER A BY (f1 == 8) OR (NOT (f2+f3 > f1));
    
    DUMP X;
    (4,2,1)
    (8,3,4)
    (7,2,5)
    (8,4,3)

    FOREACH
    基于数据列做数据转换。

    alias  = FOREACH { block | nested_block };

    block:alias作为外部包,通过FOREACH…GENERATE操作:
    alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];
    nested_block:alias作为内部包:

    alias = FOREACH nested_alias {
       alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
       GENERATE expression [AS schema] [expression [AS schema]….]
    };

    注:
    1. 内部包使用时要用{}括住内部块。
    2. nested_op允许的操作有CROSS, DISTINCT, FILTER, FOREACH, LIMIT, and ORDER BY;nested_exp:任意的表达式
    3. FOREACH最多只允许二重嵌套。
    示例:

    X = FOREACH B {
            S = FILTER A BY 'xyz';
            GENERATE COUNT (S.$0);
    }

    UDF相关

    DEFINE
    定义一种简称给UDF函数或者streaming命令。

    语法:DEFINE alias {function | [`command` [input] [output] [ship] [cache]] };

    define udf com.sohu.adrd.data.pig.udf.sessionlog.AdPvClickUDF('super','pc','tvadview');
    定义UDF的别名。
    function:UDF函数。udf名字太长的时候可以简化,便于在程序中使用。
    “|”后面的几个参数都是streaming的时候使用的命令。
    command:通过反引号括起来,命令是任何可执行的命令,可以包含参数,如python命令,shell命令等。
    input:输入文件或者标准输入,输入路径可以一个或多个。可以使用using关键字指定序列化器。将输入文件数据序列化成指定序列化格式。
    output:标准输出|标准错误输出|输出文件。一个或多个值。可通过using关键字指定反序列化器。将序列化数据反序列化成原文件数据。
    ship:文件路径。可以将依赖文件从主节点传输到集群中的工作节点。注意,这里是文件,不能是目录。适用于小文件,jar包,二进制文件。

    define x `stream.pl` ship('/work/stream.pl');
    y = stream A through x;

    cache:分布式文件系统中的文件路径名,处理已经被放在计算节点上的大文件。文件而非目录。’dfs_path#dfs_file’

    define y `stream.pl data.gz` ship('/work/stream.pl') cache('/input/data.gz#data.gz');
    x = stream A through y;

    注:
    1. DEFINE可小写。
    2. 以上文件路径名都用单引号括起来。

    REGISTER
    REGISTER后面接jar包,在写pig脚本文件内部使用,这样文件就可以直接使用jar包中的UDF。
    UDF的jar包在Pig脚本中指定的一种方式。
    REGISTER myudf.jar 然后就可以在Pig脚本中使用自己的myudf.jar中的东西。
    另一种注册方式是在执行Pig命令行中加参数 -Dpig.additional.jars的形式指定jar包路径,键值对形式,如果有多个jar,jar之间可用:符号分隔。
    注:REGISTER可小写。

    参考:
    1. Tweeter工程师写的PPT:http://www.slideshare.net/kevinweil/hadoop-pig-and-twitter-nosql-east-2009
    2. 官方文档:http://pig.apache.org/docs/r0.15.0/basic.html

  • 相关阅读:
    1065 01字符串
    poj 1321 棋盘问题
    0608模拟算法试题
    唯一的雪花
    2969 角谷猜想
    0607枚举算法试题
    3162 抄书问题(划分dp)
    8782:乘积最大(划分dp)
    2985:数字组合
    3531:判断整除
  • 原文地址:https://www.cnblogs.com/eva_sj/p/6172252.html
Copyright © 2011-2022 走看看