zoukankan      html  css  js  c++  java
  • Kettle 4.2源码分析第四讲--KettleJob机制与Database插件简介(含讲解PPT)

    1.  Job机制

      一个job项代表ETL控制流中的一项逻辑任务。Job项将会顺序执行,每个job项会产生一个结果,能作为别的分支上job项的条件。

     

    图 1 job项示例

    1.1. Job类图简介

     

    图 2 Job entry类图结构

    1.1.1.    JobEntryInteface接口

      JobEntryInterface是Job Entry插件的主要实现接口。主要包含以下功能:

    1          保存Job Entry设置

      实现类使用私有变量保存设置的参数,通过get、set方法获取和设置。Dialog实现类会通过这些方法,保存或设置设置界面上的参数。同时,需要提供一个深度拷贝的方法,因为在一些保存参数且可能修改的地方会调用。

     

    图 3 JobEntryTrans配置界面

    2       序列化插件

      插件要实现对本插件的序列化,实现两种方式xml与数据库。

     

    图 4 转换插件xml序列化结果

    3          输出信息提供

      一个job entry支持三种类型的输出:true、flase和无条件。这三种情况不是所有的job entry插件都会同时支持的,例如dummy job entry仅支持true和false。所以,插件必须显现两类函数,来查看支持哪种结果。

      public boolean evaluates()//是否支持true、false

      public boolean isUnconditional()//是否支持无条件执行

    4          执行任务

      负责工作的执行。

      public Result execute()//执行具体的逻辑,需要结果和开始到该项的距离

      prev_result.setNrErrors()//设置执行过程中的异常数

      prev_result.setResult()//设置结果,如果不知道true/false,结果不设置

      最后返回prev_result。

    1.1.2.    JobEntryDialogInteface接口

      负责构建和打开参数设置对话框。Spoon通过调用open函数打开该对话框,spoon是使用swt框架的,所以对话框也应使用swt来实现。

    1.2. Job entry交互通信类

    1.2.1.    Result

      每一个jobEntryInterface的实现类在完成相应功能时,返回结果的类型。

    主要成员变量:

    1 private boolean result;执行是否出现异常
    2 
    3 private int exitStatus; 执行结果状态
    4 
    5 private List<RowMetaAndData> rows;一个jobEntry完成处理后的数据(若存在)
    6 
    7 private Map<String, ResultFile> resultFiles;

    1.3. Job配置及开启

     

    图 5 Job开启时序图

      Job的开启与Trans相类似,配置执行的参数,检查.kjb文件是否发生变化,实例化一个Job对象,开启该线程。

    1.4. Job执行

    1.4.1.    初始执行excute1()

      主要工作是从JobMeta的JobHopMeta找到job入口jobentry信息,根据开始条件调用真正执行jobentry的execute方法2,代码如下所示:

    代码 4 Job.excute()关键代码

     1 startpoint=jobMeta.findJobEntry(JobMeta.STRING_SPECIAL_START, 0,    false);// 找到Job开始组件
     2 JobEntrySpecial jes = (JobEntrySpecial) startpoint.getEntry();
     3 // JobEntrySpecial是启动job的job项目
     4 Result res = null;
     5 while ( (jes.isRepeat() || isFirst) && !isStopped()){
     6 //符合开始条件时,调用execute方法2
     7 isFirst = false;
     8 res = execute(0, null, startpoint, null,
     9 Messages.getString("Job.Reason.Started"));
    10 }

    1.4.2.    实际执行execute2()

      execute()方法包含,的参数有执行次数(START不算,从0开始,顺序执行)、接一个Entry执行结果、当前Entry的拷贝、前一个Entry拷贝和原因。

      主要功能是根据参数startpoint,提取对应的jobentry,执行对应的jobentry操作,再根据JobMeta的hop信息依次得到下一个jobentry,递归调用。具体的执行步骤如下所示:

     

    图 6 Job执行步骤

    1.5. JobEntry执行

    1.5.1.    JobEntry类

      具体每个组件的执行体对应org.pentaho.di.job.entries包内每个entry的具体实现。

      execute()方法2中调用jobEntry的execute()完成jobEntry的具体功能。

    1.5.2.    不同jobEntry的实现

      final Result result = cloneJei.execute(prevResult, nr, rep, this);

      不同的Job项目(JobEntry)实现差别很大。

    JobEntrySpecial

      功能是开启一个job,只是简单地对传递来的preResult设置它的的result属性值为true,(Job项目据此判断前一结果执行完毕)。返回该对象即可。

    JobEntryTableExit

      功能是判断一个table是否存在数据库中。JobEntryTableExit Job项目有属性tablename和DatabaseMeta(对数据库的元数据信息描述)根据DatabaseMeta得到一个Dabase对象db,建立连接db.connect(); 调用db.checkTableExists(tablename)根据此返回值设置preResult的result属性为否为true。返回preResult对象。

    JobEntryTrans

      JobEntryJob和JobEntryTrans是嵌套job或trans的Job项目(JobEntry)。它们是比较复杂的job项目。

      作用是执行一个trans。首先实例化一个TransMeta,之后实例化Trans。调用trans.start(),当执行完毕后调用函数trans.getResult(),并把结果加到preResult中,返回该对象即可。

    补充说明

      Result中也可以有处理数据,这些处理数据可以作为下一个Job项目(JobEntry)的输入。但是容量受内存容量限制。

    2.  数据库插件

      PDI使用数据库插件来进行数据库的正确连接、执行SQL,同时也考虑现有数据的各种特殊功能和不同限制。

      在PDI里面,已经集成了非常多的数据库插件,大部分的插件都会继承自BaseDatabaseMeta。下面所示的方法通常都需要被重写,基类里面并没有相关的实现。要实现的方法主要分成3大主题:连接信息、SQL方言和功能标记。

    1          连接详情

    当PDI建立数据库连接时将会调用这些函数,或者数据库设置对话框里显示与方言有关的内容时也会调用。

    • public String getDriverClass()
    • public int getDefaultDatabasePort()
    • public int[] getAccessTypeList()
    • public boolean supportsOptionsInURL()
    • public String getURL()

    2       SQL Generation

    构建有效的SQL数据库方言时会调用这些方法。

    • public String getFieldDefinition()
    • public String getAddColumnStatement()
    • public String getSQLColumnExists()
    • public String getSQLQueryFields() 

    3       功能标记

    查询使用的数据库是否支持该功能。

    • public boolean supportsTransactions()
    • public boolean releaseSavepoint()
    • public boolean supportsPreparedStatementMetadataRetrieval()
    • public boolean supportsResultSetMetadataRetrievalOnly()

    PS:Kettle源码分析算是全部讲完了,最后奉送自己做的PPT http://pan.baidu.com/share/link?shareid=3803402535&uk=3792525916。Kettle应该算是比较小众的软件,但是在业界还是非常有名气的。我看到过好几个中国的公司,包括上市公司,说自己最新的ETL工具或者数据共享交换工具都是Kettle的改版。

  • 相关阅读:
    设置sudo的过期时间
    linux下tar.xz结尾文件的解压方法
    linux版本查看命令
    linux删除目录下所有文件,但是保留文件夹
    linux下更改文件夹名
    vim中跳到第一行和最后一行
    linux下编译安装python
    linux下 python源码包解压报错
    [Swift A]
    【构建Android缓存模块】(一)吐槽与原理分析
  • 原文地址:https://www.cnblogs.com/wukenaihe/p/3219297.html
Copyright © 2011-2022 走看看