zoukankan      html  css  js  c++  java
  • ETL-kettle通过java代码传递参数,调用job调用转换

    定义变量,以表输入为例,定义变量名称为${变量名},这里用test为变量名。并且勾选sql语句变量

    package demo;
    
    import org.pentaho.di.core.KettleEnvironment;
    import org.pentaho.di.core.database.DatabaseMeta;
    import org.pentaho.di.core.exception.KettleException;
    import org.pentaho.di.job.Job;
    import org.pentaho.di.job.JobMeta;
    import org.pentaho.di.repository.RepositoryDirectoryInterface;
    import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
    import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
    import org.pentaho.di.trans.Trans;
    import org.pentaho.di.trans.TransMeta;
    
    public class KettleTest {
        public static void runWithDb() throws Exception {
            KettleEnvironment.init();
            //创建DB资源库
            KettleDatabaseRepository repository=new KettleDatabaseRepository();
            DatabaseMeta databaseMeta=new DatabaseMeta("kettle","mysql","jdbc","localhost","kettle_repository","3306","root","root");
            //选择资源库
            KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta=new KettleDatabaseRepositoryMeta("kettle","kettle","Transformation description",databaseMeta);
            repository.init(kettleDatabaseRepositoryMeta);
            //连接资源库
            repository.connect("admin","admin");
            RepositoryDirectoryInterface directoryInterface=repository.loadRepositoryDirectoryTree();
            //选择转换
    //        TransMeta transMeta=repository.loadTransformation("测试随机数",directoryInterface,null,true,null);
    //        Trans trans=new Trans(transMeta);
    //        trans.execute(null);
    //        trans.waitUntilFinished();//等待直到数据结束
    //        if(trans.getErrors()>0){
    //            System.out.println("transformation error");
    //        }else{
    //            System.out.println("transformation successfully");
    //        }
            // 选择job 这里采用job
            JobMeta jobMeta = repository.loadJob("random", directoryInterface, null, null);
            Job job = new Job(repository, jobMeta);
            // 向Job 脚本传递参数,脚本中获取参数值:${参数名}
            // job.setVariable(paraname, paravalue);
            job.setVariable("test", "jing_vehicle_result_mapping");    //变量名称key-value
            job.start();
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception(
                        "There are errors during job exception!(执行job发生异常)");
            }
        }
    
        public static void main(String[] args) throws Exception {
            KettleTest.runWithDb();
        }
    
    }
    

    左上角编辑中,可以查看变量名称

    然后运行代码。可以看到响应的文件就已经写入的数据。

    代码中需要引用相应的jar包

    jar包在kettle工具文件下一个叫lib的文件夹中可以查找获得。

  • 相关阅读:
    [杂题笔记]2021.08.18-2021.09.03,CF#741 Div.2&CF#736 Div.2&CF Global Round15&CF#739 Div3
    第一次博客作业
    《博弈论》
    迭代法-二分迭代求解低阶线性方程
    迭代法-牛顿迭代法
    logback扩展日志输出功能
    log4j2扩展日志输出功能
    c# clr创建mssql的存储过程、函数
    驰骋BPM,工作流
    Docker部署RocketMQ踩坑记录
  • 原文地址:https://www.cnblogs.com/liclBlog/p/15349551.html
Copyright © 2011-2022 走看看