zoukankan      html  css  js  c++  java
  • xml in hadoop ETL with pig summary

    项目中需要把source为xml的文件通过flume放置到hdfs,然后通过MR导入到vertica中去,我之前做过简单的

    尝试,是通过pig的piggybank的xmlloader然后Regex_extract来提取结点属性做的,但问题是我之前只取了一

    层结点的属性,没有把不同层次结点关联起来,这有三四层,结构比较复杂,我需要重新整理思路.

    这种方式很可能走不通,因为piggybank里面regex_extract的正则和传统的正则还是有些异同的.常常会

    因为正则写的不合适经常返回空元组.

    我是一个c# guy,又不会用纯java写MR,所以就进一步搜索了google.查找相关资料.

    1.把XML先转成avro的形式,然后再使用pig 进行转换,可以减少CPU的利用率和IO的情况,具体可以参见slides:

    http://www.slideshare.net/Hadoop_Summit/bose-june26-405pmroom230cv3-24148869

    不清楚Avro,所以个人尚未尝试过此种处理方式.

    2.使用StreamingXMLLoader,来处理复杂的巨大的xml文件.

    可以参见这篇文章How to feed XML to your Pig

    http://www.tuicool.com/articles/vEJbUj

    The class is in a third party project :Mortar.

    这是一个第三方的工程,我也不清楚,所以也放弃了.

    3.使用Mahout的相关类.不清楚,未使用过.但搜索出来说mahout中相关的文档格式处理可以做这件事.

    4.使用pig的piggybank中xmlloader,但xmlloader用起来虽然简单,但其思路也简单,就是传入标签,然后获取

    此标签下的所有原生的XML,这对于此标签下有更多的复杂元素时,进一步处理起来还是很复杂的,如果是

    简单的,只有一两层的话,可以使用此法取到标签,然后再使用regex_extract来用正则获取下层的元素.

    我在项目中一开始尝试的就是这个.

    作为一个C# guy,我没有用java写过有意义的程序,最多就是hello world,所以当我决定使用java来写pig

    UDF的时候,

    在java项目中导入第三方jar包:

    http://blog.csdn.net/justinavril/article/details/2783182

    我在浏览器中打开pig 官方文档页面和pig udf manual页面作为参考,最后再从查阅pig自带的示例脚本中得到启发.

    整个学习的过程中,我还参照了一本书:programming pig,通过ppurl下载的.然后完成了以下的内容.

    我还是通过xmlloader来获取最基本的结点,然后可以通过regex_extract来获取其属性.

    然后把它作为字符串传入两个自定义的UDF,每一个udf返回的类型都是databag,然后通过flatten函数扁平化,

    作为另外两张表的输入,传入到vertica.  

    以上是需求的分析,其实使用C# or java 通过 dom和xpath来作还是非常简单的.

    java工程中我写了三个类,两个类分别实现了两个UDF,都是返回databag,重写了返回的schema.

    第三个类是一个测试驱动,里面有一个main函数来调用写的UDF,测试有没有异常.

    java项目开发完毕后,需要编译和打包,以下代码是示例,注意编译的时候也需要把pig...jar加进去.

    编译成类并打包:

    [hadoop@namenode test]$ ls

    GetDataLogs.java GetStepNodes.java

    [hadoop@namenode test]$ javac -cp ../pig-0.12.0.jar GetStepNodes.java

    [hadoop@namenode test]$ javac -cp ../pig-0.12.0.jar GetDataLogs.java

    [hadoop@namenode test]$ ls

    GetDataLogs.class GetDataLogs.java GetStepNodes.class GetStepNodes.java

    [hadoop@namenode test]$ cd ..

    [hadoop@namenode src]$ ls

    pig-0.12.0.jar test

    [hadoop@namenode src]$ jar -cf test.jar test

    [hadoop@namenode src]$ ls

    pig-0.12.0.jar test test.jar

    以下是pig latin代码示例,仅供参考:

    register /home/hadoop/workspace/test/src/test.jar
    
    register /home/hadoop/pig-0.12.0/contrib/piggybank/java/piggybank.jar
    
    xml = load '/FFA/FFA_TEST.xml' using org.apache.pig.piggybank.storage.XMLLoader('CIMProjectResults') as(testrun:chararray);
    
    stepnodes = foreach xml generate flatten(test.GetStepNodes(testrun));
    
    dump stepnodes;
    
    datalognodes = foreach xml generate flatten(test.GetDataLogs(testrun));
    
    dump datalognodes;
    View Code

     JAVA示例代码如下:

    package test;
    
    import java.io.IOException;
    import java.io.StringReader;
    
    import org.apache.pig.EvalFunc;
    import org.apache.pig.backend.executionengine.ExecException;
    import org.apache.pig.data.Tuple; 
    import org.apache.pig.data.TupleFactory;
    import org.apache.pig.data.BagFactory;
    import org.apache.pig.data.DataBag;
    
    import javax.xml.parsers.*;
    
    import org.w3c.dom.*;
    import org.xml.sax.InputSource;
    import org.xml.sax.SAXException;
    
    import javax.xml.xpath.*;
    
    import org.apache.pig.impl.logicalLayer.FrontendException;
    import org.apache.pig.impl.logicalLayer.schema.Schema;
    import org.apache.pig.data.DataType;
    
    public class GetDataLogs extends EvalFunc<DataBag>{
        
        TupleFactory mTupleFactory = TupleFactory.getInstance();
        BagFactory   mBagFactory   = BagFactory.getInstance();
        
        private DataBag GetDataLogNodes(Tuple input) throws SAXException, IOException, ParserConfigurationException, XPathExpressionException
        {
            DataBag  output = mBagFactory.newDefaultBag();
            try {
                    String xml = input.get(0).toString();
                  
                    DocumentBuilderFactory domFactory = DocumentBuilderFactory.newInstance();
                    domFactory.setNamespaceAware(true);
                    DocumentBuilder builder = domFactory.newDocumentBuilder();
                    Document doc = builder.parse(new InputSource(new StringReader(xml)));
                    
                    XPathFactory factory = XPathFactory.newInstance();
                    XPath xpath = factory.newXPath();
                    
                    XPathExpression expr = xpath.compile("CIMProjectResults/Sequence");
                    Object result = expr.evaluate(doc, XPathConstants.NODESET);
                    NodeList nodes = (NodeList) result;        
                    
                    NodeList stepNodes = nodes.item(0).getChildNodes();
                    if(stepNodes == null) return null;
                    for(int i=0;i<stepNodes.getLength();i++)
                    {
                        if(stepNodes.item(i).getNodeType()!=Node.ELEMENT_NODE) continue;
                        NodeList stepchildnodes = stepNodes.item(i).getChildNodes();
                        NamedNodeMap stepAttrs = stepNodes.item(i).getAttributes();
                        if(stepAttrs == null) continue;
                        if(stepchildnodes == null) continue;
                        String TestResult = "";
                        String TableName = "";
                    
                        
                        for(int k=0;k<stepchildnodes.getLength();k++)
                        {
                            if(stepchildnodes.item(k).getNodeName()=="DataLog")
                            {
                                NamedNodeMap dlattrs = stepchildnodes.item(k).getAttributes();
                                if(dlattrs==null) continue;
                                TestResult = dlattrs.getNamedItem("TestResult").getNodeValue()==null?"":dlattrs.getNamedItem("TestResult").getNodeValue();
                                TableName = dlattrs.getNamedItem("TableName").getNodeValue()==null?"":dlattrs.getNamedItem("TableName").getNodeValue();
                                break;
                            }
                        }
                        
                        for(int k=0;k<stepchildnodes.getLength();k++)
                        {
                            Tuple t = null;
                            NodeList testparmnodes = null;
                            if(stepchildnodes.item(k).getNodeType() == Node.ELEMENT_NODE)
                            {
                                 if(stepchildnodes.item(k).getNodeName()=="TestParms")
                                 {
                                    testparmnodes = stepchildnodes.item(k).getChildNodes();
                                    if(testparmnodes==null)
                                    {
                                        ////////////////////////
                                        t = mTupleFactory.newTuple(9);
                                        t.set(0,stepAttrs.getNamedItem("StepName").getNodeValue()==null?"":stepAttrs.getNamedItem("StepName").getNodeValue());
                                        t.set(1,stepAttrs.getNamedItem("StepNumber").getNodeValue()==null?"":stepAttrs.getNamedItem("StepNumber").getNodeValue());
                                        t.set(2, TestResult);
                                        t.set(3,TableName);
                                        output.add(t);
                                        continue;
                                    }
                                    for(int l=0; l<testparmnodes.getLength();l++)
                                    {
                                        if(testparmnodes.item(l).getNodeType()!=Node.ELEMENT_NODE) continue;
                                        t = mTupleFactory.newTuple(9);
                                        t.set(0,stepAttrs.getNamedItem("StepName").getNodeValue()==null?"":stepAttrs.getNamedItem("StepName").getNodeValue());
                                        t.set(1,stepAttrs.getNamedItem("StepNumber").getNodeValue()==null?"":stepAttrs.getNamedItem("StepNumber").getNodeValue());
                                        t.set(2, TestResult);
                                        t.set(3,TableName);
                                        NamedNodeMap testparmnnm = testparmnodes.item(l).getAttributes();
                                        if (testparmnnm==null) {output.add(t);continue;}
                                        t.set(4,testparmnnm.getNamedItem("Name").getNodeValue()==null?"":testparmnnm.getNamedItem("Name").getNodeValue());
                                        t.set(5,testparmnnm.getNamedItem("Value").getNodeValue()==null?"":testparmnnm.getNamedItem("Value").getNodeValue());
                                        t.set(6,testparmnnm.getNamedItem("Optional").getNodeValue()==null?"":testparmnnm.getNamedItem("Optional").getNodeValue());
                                        t.set(7,testparmnnm.getNamedItem("Set").getNodeValue()==null?"":testparmnnm.getNamedItem("Set").getNodeValue());
                                        t.set(8,testparmnnm.getNamedItem("Key").getNodeValue()==null?"":testparmnnm.getNamedItem("Key").getNodeValue());
                                        output.add(t);
                                    }    //l
                                }//else if
                            }//if
                        }//k
                     }//i
                    
            } catch (ExecException e) {
                e.printStackTrace();
            }
            return output;
        }
        
        public DataBag exec(Tuple input) throws IOException {
            if (input == null || input.size() == 0 )
                return null;
            try{
                return GetDataLogNodes(input);
            }catch(Exception e){
                e.printStackTrace();
                return null;
            }
        }
        
        @Override
        /**
         * This method gives a name to the column.
         * @param input - schema of the input data
         * @return schema of the input data
         */
        public Schema outputSchema(Schema input) {
    
             Schema bagSchema = new Schema();
             bagSchema.add(new Schema.FieldSchema("StepName", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("StepNumber", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("TestResult", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("TableName", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("Name", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("Value", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("Optional", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("Set", DataType.CHARARRAY));
             bagSchema.add(new Schema.FieldSchema("Key", DataType.CHARARRAY));
             
             try{
                return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), 
                                                        bagSchema, DataType.BAG));
             }catch (FrontendException e){
                return null;
             }
    }
    }
    View Code

    现在的pig script 只是load了一个文件,而事实上是有成千上万上文件,pig里面的代码是可以写成参数的,

    然后传递参数过去。

    log = load '$input' as (schema.....)

    lmt =limit log $size;

    dump lmt;

    当你调用的时候可以使用:pig -parm input =....-parm size=4  mypigfilename.pig

    你可以在shell里面使用。

     pig 里面一次不是只能处理一个文件,而是可以处理一个或多个目录下面的多个相同数据结构的文件.可以使用通配符,例如

    xmlfiles/*.xml

       

    Looking for a job working at Home about MSBI
  • 相关阅读:
    理解inode
    贝叶斯公式与拼写检查器
    《C程序设计语言》第四章 函数和程序结构
    MIT《计算机科学与编程导论》课堂笔记
    很牛的牛顿迭代法
    开发一个小工具重温C#经典问题
    斯坦福《编程方法学》环境搭建及常见问题
    看Sybase官方手册学索引工作原理
    学习编程的方法、软件和工具
    大师里奇留给了我们什么
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/3464600.html
Copyright © 2011-2022 走看看