1、通过kettle连接neo4j

kettle和neo4j连接
参考网站:https://www.bsimard.com/2018/09/03/kettle-neo4j.html
kettle连接mysql参数设置


参考:https://blog.csdn.net/weixin_39588754/article/details/87982399
springboot整合kettle
1、添加依赖jar

2、添加坐标
<dependency>
<groupId>kettle</groupId>
<artifactId>kettle-core</artifactId>
<scope>system</scope>
<version>8.2.0.0-342</version>
<systemPath>${project.basedir}/src/main/resources/libs/kettle-core-8.2.0.0-342.jar</systemPath>
</dependency>
<dependency>
<groupId>kettle</groupId>
<artifactId>kettle-engine</artifactId>
<scope>system</scope>
<version>8.2.0.0-342</version>
<systemPath>${project.basedir}/src/main/resources/libs/kettle-engine-8.2.0.0-342.jar</systemPath>
</dependency>
<dependency>
<groupId>commons</groupId>
<artifactId>commons-vfs2</artifactId>
<scope>system</scope>
<version>2.2</version>
<systemPath>${project.basedir}/src/main/resources/libs/commons-vfs2-2.2.jar</systemPath>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-capability-manager</artifactId>
<scope>system</scope>
<version>8.2.0.0-342</version>
<systemPath>${project.basedir}/src/main/resources/libs/pentaho-capability-manager-8.2.0.0-342.jar</systemPath>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-capability-manager</artifactId>
<scope>system</scope>
<version>pentaho-vfs-browser</version>
<systemPath>${project.basedir}/src/main/resources/libs/pentaho-vfs-browser-8.2.0.0-342.jar</systemPath>
</dependency>
<dependency>
<groupId>metastore</groupId>
<artifactId>metastore</artifactId>
<scope>system</scope>
<version>8.2.0.0-342</version>
<systemPath>${project.basedir}/src/main/resources/libs/metastore-8.2.0.0-342.jar</systemPath>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-registry</artifactId>
<scope>system</scope>
<version>8.2.0.0-342</version>
<systemPath>${project.basedir}/src/main/resources/libs/pentaho-registry-8.2.0.0-342.jar</systemPath>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>system</scope>
<version>5.1.48</version>
<systemPath>${project.basedir}/src/main/resources/libs/mysql-connector-java-5.1.48.jar</systemPath>
</dependency>
<dependency>
<groupId>pentaho-connections</groupId>
<artifactId>pentaho-connections</artifactId>
<scope>system</scope>
<version>8.2.0.0-342</version>
<systemPath>${project.basedir}/src/main/resources/libs/pentaho-connections-8.2.0.0-342.jar</systemPath>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
注意:必须添加kettle文件夹中的lib下面的mysql的jar
3、代码
package com.hgchain.neo4j.data.service;
import com.hgchain.neo4j.data.configure.KettleMysqlRepProperties;
import com.hgchain.neo4j.data.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* @author liubh
* @date 2020/12/15 9:18
**/
@Service
@Slf4j
public class DispatchJobService {
private static final String KETTLE_MYSQL_USERNAME = "admin";
private static final String KETTLE_MYSQL_PASSWORD = "admin";
KettleDatabaseRepository rep = new KettleDatabaseRepository();
@Autowired
private KettleMysqlRepProperties kettleMysqlRepProperties;
/**
* 连接数据库资源库
*
* @return
* @throws KettleException
*/
public Boolean repositoryCon() {
try {
KettleEnvironment.init();
//EnvUtil.environmentInit();
DatabaseMeta dataMeta = new DatabaseMeta(kettleMysqlRepProperties.getName(), kettleMysqlRepProperties.getType(),
kettleMysqlRepProperties.getAccess(), kettleMysqlRepProperties.getHost(), kettleMysqlRepProperties.getDb(), kettleMysqlRepProperties.getPort(), kettleMysqlRepProperties.getUser(), kettleMysqlRepProperties.getPass());
//选择资源库
KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta();
repInfo.setConnection(dataMeta);
//数据库形式的资源库元对象
rep.init(repInfo);
rep.connect(KETTLE_MYSQL_USERNAME, KETTLE_MYSQL_PASSWORD);
} catch (KettleException e) {
throw new BusinessException("连接数据库资源库失败");
}
if (rep.isConnected()) {
log.info("连接成功");
return true;
} else {
log.info("连接失败");
return false;
}
}
/**
* 调度任务
* @param jobName
*/
// @Scheduled(cron = "0/5 * * * * *")
public void runJob(String jobName) {
//数据库资源库连接
Boolean aBoolean = this.repositoryCon();
if (!aBoolean) {
throw new BusinessException("连接数据库资源库失败");
}
try {
//根据指定的字符串路径 找到目录
RepositoryDirectoryInterface dir = rep.findDirectory("/");
//加载指定的job
JobMeta jobMeta = rep.loadJob(rep.getJobId(jobName, dir), null);
Job job = new Job(rep, jobMeta);
//设置参数
//jobMeta.setParameterValue("method", "update");
job.setLogLevel(LogLevel.ERROR);
//启动执行指定的job
job.run();
job.waitUntilFinished();//等待job执行完;
job.setFinished(true);
log.info(job.getResult().getLogText());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @param transName 要调用的trans名称
* 调用资源库中的trans
*/
// @Scheduled(cron = "0/5 * * * * *")
public void runTrans(String transName) {
//数据库资源库连接
Boolean aBoolean = this.repositoryCon();
if (!aBoolean) {
throw new BusinessException("连接数据库资源库失败");
}
try {
//根据指定的字符串路径 找到目录
RepositoryDirectoryInterface dir = rep.findDirectory("/");
TransMeta transMeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null);
//设置参数
//tmeta.setParameterValue("", "");
Trans trans = new Trans(transMeta);
//执行trans
trans.execute(null);
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
log.info("kettle转换失败");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
扩展知识-读取application.yml的属性值
package com.hgchain.neo4j.data.configure;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author bai
*/
@Component
@ConfigurationProperties(prefix = "kettle-repository")
@Data
public class KettleMysqlRepProperties {
private String name;
private String access;
private String type;
private String host;
private String db;
private String user;
private String pass;
private String port;
}

参考网站:https://blog.csdn.net/hubeilihao/article/details/28647721