1、通过kettle连接neo4j
kettle和neo4j连接
参考网站:https://www.bsimard.com/2018/09/03/kettle-neo4j.html
kettle连接mysql参数设置
参考:https://blog.csdn.net/weixin_39588754/article/details/87982399
springboot整合kettle
1、添加依赖jar
2、添加坐标
<dependency> <groupId>kettle</groupId> <artifactId>kettle-core</artifactId> <scope>system</scope> <version>8.2.0.0-342</version> <systemPath>${project.basedir}/src/main/resources/libs/kettle-core-8.2.0.0-342.jar</systemPath> </dependency> <dependency> <groupId>kettle</groupId> <artifactId>kettle-engine</artifactId> <scope>system</scope> <version>8.2.0.0-342</version> <systemPath>${project.basedir}/src/main/resources/libs/kettle-engine-8.2.0.0-342.jar</systemPath> </dependency> <dependency> <groupId>commons</groupId> <artifactId>commons-vfs2</artifactId> <scope>system</scope> <version>2.2</version> <systemPath>${project.basedir}/src/main/resources/libs/commons-vfs2-2.2.jar</systemPath> </dependency> <dependency> <groupId>pentaho</groupId> <artifactId>pentaho-capability-manager</artifactId> <scope>system</scope> <version>8.2.0.0-342</version> <systemPath>${project.basedir}/src/main/resources/libs/pentaho-capability-manager-8.2.0.0-342.jar</systemPath> </dependency> <dependency> <groupId>pentaho</groupId> <artifactId>pentaho-capability-manager</artifactId> <scope>system</scope> <version>pentaho-vfs-browser</version> <systemPath>${project.basedir}/src/main/resources/libs/pentaho-vfs-browser-8.2.0.0-342.jar</systemPath> </dependency> <dependency> <groupId>metastore</groupId> <artifactId>metastore</artifactId> <scope>system</scope> <version>8.2.0.0-342</version> <systemPath>${project.basedir}/src/main/resources/libs/metastore-8.2.0.0-342.jar</systemPath> </dependency> <dependency> <groupId>pentaho</groupId> <artifactId>pentaho-registry</artifactId> <scope>system</scope> <version>8.2.0.0-342</version> <systemPath>${project.basedir}/src/main/resources/libs/pentaho-registry-8.2.0.0-342.jar</systemPath> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>system</scope> <version>5.1.48</version> <systemPath>${project.basedir}/src/main/resources/libs/mysql-connector-java-5.1.48.jar</systemPath> </dependency> <dependency> <groupId>pentaho-connections</groupId> <artifactId>pentaho-connections</artifactId> <scope>system</scope> <version>8.2.0.0-342</version> <systemPath>${project.basedir}/src/main/resources/libs/pentaho-connections-8.2.0.0-342.jar</systemPath> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency>
注意:必须添加kettle文件夹中的lib下面的mysql的jar
3、代码
package com.hgchain.neo4j.data.service; import com.hgchain.neo4j.data.configure.KettleMysqlRepProperties; import com.hgchain.neo4j.data.exception.BusinessException; import lombok.extern.slf4j.Slf4j; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LogLevel; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.kdr.KettleDatabaseRepository; import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; /** * @author liubh * @date 2020/12/15 9:18 **/ @Service @Slf4j public class DispatchJobService { private static final String KETTLE_MYSQL_USERNAME = "admin"; private static final String KETTLE_MYSQL_PASSWORD = "admin"; KettleDatabaseRepository rep = new KettleDatabaseRepository(); @Autowired private KettleMysqlRepProperties kettleMysqlRepProperties; /** * 连接数据库资源库 * * @return * @throws KettleException */ public Boolean repositoryCon() { try { KettleEnvironment.init(); //EnvUtil.environmentInit(); DatabaseMeta dataMeta = new DatabaseMeta(kettleMysqlRepProperties.getName(), kettleMysqlRepProperties.getType(), kettleMysqlRepProperties.getAccess(), kettleMysqlRepProperties.getHost(), kettleMysqlRepProperties.getDb(), kettleMysqlRepProperties.getPort(), kettleMysqlRepProperties.getUser(), kettleMysqlRepProperties.getPass()); //选择资源库 KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta(); repInfo.setConnection(dataMeta); //数据库形式的资源库元对象 rep.init(repInfo); rep.connect(KETTLE_MYSQL_USERNAME, KETTLE_MYSQL_PASSWORD); } catch (KettleException e) { throw new BusinessException("连接数据库资源库失败"); } if (rep.isConnected()) { log.info("连接成功"); return true; } else { log.info("连接失败"); return false; } } /** * 调度任务 * @param jobName */ // @Scheduled(cron = "0/5 * * * * *") public void runJob(String jobName) { //数据库资源库连接 Boolean aBoolean = this.repositoryCon(); if (!aBoolean) { throw new BusinessException("连接数据库资源库失败"); } try { //根据指定的字符串路径 找到目录 RepositoryDirectoryInterface dir = rep.findDirectory("/"); //加载指定的job JobMeta jobMeta = rep.loadJob(rep.getJobId(jobName, dir), null); Job job = new Job(rep, jobMeta); //设置参数 //jobMeta.setParameterValue("method", "update"); job.setLogLevel(LogLevel.ERROR); //启动执行指定的job job.run(); job.waitUntilFinished();//等待job执行完; job.setFinished(true); log.info(job.getResult().getLogText()); } catch (Exception e) { e.printStackTrace(); } } /** * @param transName 要调用的trans名称 * 调用资源库中的trans */ // @Scheduled(cron = "0/5 * * * * *") public void runTrans(String transName) { //数据库资源库连接 Boolean aBoolean = this.repositoryCon(); if (!aBoolean) { throw new BusinessException("连接数据库资源库失败"); } try { //根据指定的字符串路径 找到目录 RepositoryDirectoryInterface dir = rep.findDirectory("/"); TransMeta transMeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null); //设置参数 //tmeta.setParameterValue("", ""); Trans trans = new Trans(transMeta); //执行trans trans.execute(null); trans.waitUntilFinished(); if (trans.getErrors() > 0) { log.info("kettle转换失败"); } } catch (Exception e) { e.printStackTrace(); } } }
扩展知识-读取application.yml的属性值
package com.hgchain.neo4j.data.configure; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * @author bai */ @Component @ConfigurationProperties(prefix = "kettle-repository") @Data public class KettleMysqlRepProperties { private String name; private String access; private String type; private String host; private String db; private String user; private String pass; private String port; }
参考网站:https://blog.csdn.net/hubeilihao/article/details/28647721