zoukankan      html  css  js  c++  java
  • kettle 4.4源代码分析Transformation

    1.1. 相关的类和接口

    1.1.1. JobEntryTrans

    实现了JobEntryInterfaceexecute()方法,被job运行。由JobEntryTrans实例化Trans,并运行。

    1.1.2. TransGraph

    当点击trans面板的run时。由TransGraph实例化Trans。并运行。

    Trans主要成员有:

    private TransMeta transMeta;

    private Repository repository;

       private Job parentJob;

    private Trans parentTrans;

       private List<RowSetrowsets;

    private List<StepMetaDataCombi> steps

    当中最重要的是rowsetssteps

    rowsets保存了全部hop相应的行元数据和数据信息。

    List<StepMetaDataCombi> steps封装了一个step的主要内容。

    1.1.3. TransMeta

    描写叙述了整个Trans的元数据信息。 基本的属性成员有:

        private List<StepMeta>           steps;

    private List<TransHopMeta>       hops;

    private String              name;

    private Result            previousResult;上一个jobentry的运行结果。

        private List<RowMetaAndData> resultRows;这次trans运行后的数据结果。

        private List<ResultFile>     resultFiles;            

    resultRows成员将作为result比部分返回多行的元数据和数据(假设有的话)须要返回数据结果时。

    把resultRows增加Result结果的rows列表,并返回。

    1.1.4. StepMetaDataCombi

    提取了一个step所需的主要信息。

    public class StepMetaDataCombi

    {

        public StepMeta stepMeta;

        public String stepname;

        public int    copy;

        public StepInterface     step;

        public StepMetaInterface meta;

        public StepDataInterface data;

    }

    1.1.5. TransHopMeta

    描写叙述hop信息。

    1.1.6. StepMeta

    描写叙述step的公有基本信息(stepidstepname),对于每个详细的step。由成员变量StepMetaInterface step来描写叙述。

    1.1.7. StepInterface

    主要成员函数:

    processRow()对一行的数据处理。

    putRow()把处理后的数据放入下一个stepinputrowsets中。

    1.1.8. StepBase

    实现了StepInterface是各step详细实现类的基类。完毕了公用的处理函数。如putRow()。可是对于更详细的processRow()StepBase的子类中。StepBase的主要成员有

    public ArrayList<RowSet>  inputRowSets。outputRowSets

    StepBase的子类每次从inputRowSets中取出一行数据,向outputRowSets中写入一行数据。

    1.1.9. StepDataInterface

    step相关的数据信息。比方行的元数据信息。StepMetaInterface的实现类是与详细step相关的元数据信息,与StepMeta配合使用。共同描写叙述详细step的元数据信息。

    1.1.10. RowSet

     

    RowSet类中包括源step。目标step和由源向目标发送的一个rowMeta和一组data。当中data数据是以行为单位的队列(queArray)。一个RowSet作为此源stepoutputrowsets的一部分。

    同一时候作为目标stepinputRowsets一部分。源Step每次向队列中写一行数据。目标step每次从队列中读取一行数据。

    1.1.11. RowMetaAndData

    public class RowMetaAndData implements Cloneable{   

    private RowMetaInterface rowMeta;//行的元数据,描写叙述了每行的数据名字,数据类型。

    private Object[]         data;//数据

    }

    1.2. 运行过程概述

    Trans的运行机制是搭建一个结构,使得每个step可以从自己的inputRowsets读。处理一行,将结果输出到自己的outputRowsets中。

     

     

    注意:一个rowset对象既属于前一个step成员outputRowsets的一部分,也属于后一个对象的inputRowsets的一部分。全部的rowset信息都在Trans对象中以List形式维护。

    1.3. Trans运行过程时序图

    因为trans能够有TransGraph实例化。也能够由JobEntryTrans实例化。

    但基本过程是一样的,先实例化TransMeta,再实例化Trans,终于调用transstart方法。

    TransGraph实例化例如以下图所看到的:

     

    JobEntryTrans实例化,例如以下图所看到的:

     

    1.4. Trans代码解释

    1.4.1. JobEntryTransexecute( )

    首先获取元数据。然后以此作为參数实例化trans

    TransMeta transMeta = getTransMeta(rep);

    ……

    Trans trans = new Trans(transMeta);

    ……

    trans.execute(args);

    1.4.2. Transexecute( )

    详细运行前须要进行准备工作

    public void execute(String[] arguments) throws KettleException{

             prepareExecution(arguments);

             startThreads();

         }

    1.4.3. Trans的prepareExecution()

    搭建下面结构结构。

    (1)、对每个step依据hop信息进行找到下一个step或多个step

    (2)、对于每个this stepnex tstep生成一个RowSet对象,作为缓存供this step写,同一时候供next step读取数据。

    (3)、把此RowSet对象增加到TransList<RowSet>成员中保存。

    List<StepMeta> hopsteps=transMeta.getTransHopSteps(false);

    得到step列表

    ……

    对每个step进行例如以下设置

    for (int i=0;i<hopsteps.size();i++)

    {

    StepMeta thisStep=hopsteps.get(i);

    ……

    //找到下一个step的列表

    List<StepMeta> nextSteps = transMeta.findNextSteps(thisStep);

    int nrTargets = nextSteps.size();

    for (int n=0;n<nrTargets;n++)

    {

    StepMeta nextStep = nextSteps.get(n);

    …… 对于每个hop信息生成RowSet,并设置RowSet。把

     

     int thisCopies = thisStep.getCopies(); 处理源step的次数


            int nextCopies = nextStep.getCopies(); 处理目标step的次数


                       for (int c=0;c<nrCopies;c++)     nrCopies   依据 上面的thiscopies和nextcopies得到
        {
        RowSet rowSet;    //定义rowset  用来存放元数据   对于每个hop信息生成RowSet,并设置RowSet

        switch(transMeta.getTransformationType()) {
        case Normal:
         // This is a temporary patch until the batching rowset has proven to be working in all situations.
         // Currently there are stalling problems when dealing with small amounts of rows.
         //
         Boolean batchingRowSet = ValueMeta.convertStringToBoolean(System.getProperty(Const.KETTLE_BATCHING_ROWSET));
         if (batchingRowSet!=null && batchingRowSet.booleanValue()) {
           rowSet = new BlockingBatchingRowSet(transMeta.getSizeRowset());        
         } else {
           rowSet = new BlockingRowSet(transMeta.getSizeRowset());
         }
         break;
         
        case SerialSingleThreaded: 
         rowSet = new SingleRowRowSet(); 
         break;
         
                  case SingleThreaded: 
                    rowSet = new QueueRowSet(); 
                    break;
           .................         

    rowsets.add(rowSet);                   最后的得到rowset


                } 

    ……

    (4)、依据TransMetastep信息生成对应的StepMetaDataCombi(即steps)信息,加到steps列表中。

    StepMetaDataCombi combi = new StepMetaDataCombi();

    combi.stepname = stepMeta.getName();

    combi.copy     = c;

    combi.stepMeta = stepMeta;

    combi.meta = stepMeta.getStepMetaInterface();

    StepDataInterface data = combi.meta.getStepData();

    combi.data = data;

    ……

    StepInterface step=combi.meta.getStep(stepMeta, data, c, transMetathis);

    在step初始化时,会把Trans中的List<RowSet>的对应的rowset增加到stepinputRowSets,和outputRowSets中。

    combi.step = step;

    steps.add(combi);

    1.4.4. TransstartThreads( )

    打开了全部的step线程,核心代码例如以下:

         for (int i=0;i<steps.size();i++){

          steps.get(i).step.start(); 

         }

    1.5. Step运行

    实现StepInterface的不同的step各个功能个不一样。可是它们之间也有一定的规律性。下图仅仅列举了两个step,(TextInput)文本输入和Uniquerow(去重)

     

    1.5.1. 启动

    每个详细的step启动线程时。自己主动调用run函数,它们统一调用基类的静态方法

    public void run(){

         BaseStep.runStepThread(thismetadata);

    }

    1.5.2. 处理

    基类BaseStep採取了统一的处理方式。调用子类processRow以行为单位处理,核心代码例如以下。

    while (stepInterface.processRow(meta, data) && !stepInterface.isStopped());

    processRow( )通用过程是:调用基类BaseStep 的getRow( )得到数据,对一行数据处理,处理之后调用基类putRow( )方法数据保存至outputRowSets(即next stepinputRowSets

    1.5.3. 元数据与数据关系。

      Trans中的ETL过程(每一个step)以行为单位处理,当中行的元数据信息RowMeta和数据信息统一保存在RowSet对象中。

    RowSetRowMeta的成员的调试结果例如以下。可见rowMeta储存了每列数据的名称和类型。第一列列名flag,数据是长度为1String;第二列列名id

     

     

    RowSet的数据信息在queArray队列中,调试结果例如以下:能够看出第一个数据元素是一个Object包括了3列,数据内容为(N1a…)

     

    版权声明:本文博主原创文章。博客,未经同意不得转载。

  • 相关阅读:
    iBatis——自动生成DAO层接口提供操作函数(详解)
    【Spring Boot项目】Win7+JDK8+Tomcat8环境下的War包部署
    MySQL使用小记
    DB迁移:从SQL Server 2005到MySQL
    【文章学习】监控网页卡顿、崩溃
    为什么执行x in range(y)如此之快
    python笔试题(三)
    python笔试题(二)
    python笔试题(-)
    rest-framework(2)
  • 原文地址:https://www.cnblogs.com/bhlsheji/p/4908428.html
Copyright © 2011-2022 走看看