zoukankan      html  css  js  c++  java
  • Kettle java调用

    package kettle;

    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.List;

    import org.apache.log4j.Logger;
    import org.pentaho.di.core.KettleEnvironment;
    import org.pentaho.di.core.database.DatabaseMeta;
    import org.pentaho.di.core.exception.KettleDatabaseException;
    import org.pentaho.di.core.exception.KettleException;
    import org.pentaho.di.core.util.EnvUtil;
    import org.pentaho.di.job.Job;
    import org.pentaho.di.job.JobMeta;
    import org.pentaho.di.repository.LongObjectId;
    import org.pentaho.di.repository.ObjectId;
    import org.pentaho.di.repository.RepositoryDirectory;
    import org.pentaho.di.repository.RepositoryDirectoryInterface;
    import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
    import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
    import org.pentaho.di.trans.Trans;
    import org.pentaho.di.trans.TransMeta;

    public class KettleExecutor {

        Logger log = Logger.getLogger(getClass());
        KettleDatabaseRepository rep;
        RepositoryDirectoryInterface dir;
        
        
        public String getDatabaseRepositoryXMl(){
            String xml="<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
                    "<connection>" +
                          "<name>Kettle</name>" +
                          "<type>MSSQL</type>" +
                          "<server>127.0.0.1</server>" +
                        "<access>Native</access>" +
                        "<database>Kettle</database>" +
                        "<port>1433</port>" +
                        "<username>SA</username>" +
                        "<password>password</password>" +
                        "<servername/>" +
                        "<data_tablespace/>" +
                        "<index_tablespace/>" +
                    "</connection>" ;
            return xml;
        }
        
        public void connectRepository(String username,String password){
            try {
                EnvUtil.environmentInit();
                KettleEnvironment.init();
                DatabaseMeta dataMeta = new DatabaseMeta(getDatabaseRepositoryXMl());
                KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta();
                repInfo.setConnection(dataMeta);
                
                rep = new KettleDatabaseRepository();
                rep.init(repInfo);
                rep.connect(username, password);
                
                ObjectId rootId = Long.parseLong(rep.getRootDirectoryID().getId()) > 0 ? rep.getRootDirectoryID() : new LongObjectId(0);
                dir = new RepositoryDirectory();
                dir.setObjectId(rootId);
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        
        public List<ObjectId> getObjectIds(String sql) throws KettleDatabaseException{
            List<ObjectId> list = new ArrayList<ObjectId>();
            Statement stmt = null;
            ResultSet rs = null;
            try {
                stmt = rep.getDatabase().getConnection().createStatement();
                rs = stmt.executeQuery(sql);
                while(rs.next())
                    list.add(new LongObjectId(rs.getLong(1)));        
            } catch (SQLException e) {
                e.printStackTrace();
            }finally{            
                try {
                    if(rs != null)
                        rs.close();
                    if(stmt != null)
                        stmt.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }            
            }
            return list;
        }
        
        public void runAllJobs(){
            try {
                List<ObjectId> jobIds = getObjectIds("select id_job from r_job where job_status >= 0 order by id_job");
                for (final ObjectId oid : jobIds) {
    //                new JobsExecutor(rep, oid).run();
                    new Runnable() {
                        @Override
                        public void run(){
                            try {
                                JobMeta jobMeta = rep.loadJob(oid, null);
                                Job job = new Job(rep,jobMeta);
                                log.info("***********************************开始执行job:"+job.getJobname());
                                job.start();
                                job.waitUntilFinished();
                                if (job.getErrors() > 0) {
                                    log.info("***********************************执行job错误:"+job.getJobname());
                                }
                            } catch (KettleException e) {
                                e.printStackTrace();
                            }
                        }
                    }.run();                
                }
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        
        public void runJob(ObjectId jobId,String versionLabel){
            try {
                JobMeta jobMeta = rep.loadJob(jobId, versionLabel);
                Job job = new Job(rep,jobMeta);
                job.start();
                job.waitUntilFinished();
                if (job.getErrors() > 0) {
                    System.out.println("decompress fail!");
                }
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        
        /**
         * 文件方式调用Job
         * @param fileName Job脚本的路径及名称
         */
        public void runJob(String fileName) {
            try {
                KettleEnvironment.init();
                JobMeta jobMeta = new JobMeta(fileName, null);
                Job job = new Job(null, jobMeta);
                // 向Job 脚本传递参数,脚本中获取参数值:${参数名}
                // job.setVariable(paraname, paravalue);
                job.start();
                job.waitUntilFinished();
                if (job.getErrors() > 0) {
                    System.out.println("decompress fail!");
                }
            } catch (KettleException e) {
                System.out.println(e);
            }
        }
        
        /**
         * 文件方式调用Transformation
         * @param filename  Transformation脚本的路径及名称
         */
        public void runTran(String filename) {
            try {
                KettleEnvironment.init();
                TransMeta transMeta = new TransMeta(filename);
                Trans trans = new Trans(transMeta);
                trans.prepareExecution(null);
                trans.startThreads();
                trans.waitUntilFinished();

                if (trans.getErrors() != 0) {
                    System.out.println("Error");
                }
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        
    }

  • 相关阅读:
    wpf-x-指令元素
    意法半导体STM32单片机特性
    非易失性存储器MRAM的两大优点
    静态SDRAM和动态SDRAM的区别
    使用SRAM如何节省芯片面积
    不同类别存储器基本原理
    串口SRAM和并口SRAM的引脚区别
    SRAM存储器芯片地址引脚线短路检测方法
    2020年国内MCU市场有望突破500亿元
    MRAM可以替代NOR或SRAM
  • 原文地址:https://www.cnblogs.com/Byrd/p/3115161.html
Copyright © 2011-2022 走看看