zoukankan      html  css  js  c++  java
  • KETTLE封装

    一、背景  

      每天好像都很忙碌,已经不记得上次写博客是什么时候,这两天公司项目一个特殊的组网环境,需要重新搭建了KETTLE的封装,实现java调用ETL完成数据抽取转换加载的一些功能,搞了这么这么久的ETL,没想到今天掉坑里了,浪费了小半天的时间,以前觉得写博客要记录一些重大突破的或者高频的技术点,经历了这次,决定把所有遇到的问题都记录一下,顺道分享一下技术实现。

    二、问题

          1、kettle的java开发包里没有添加pom依赖,需要手动添加和组合,这就回到了最原始的项目开发模式,那问题也很突出了,jar包冲突或者jar包引入不合适

          2、kettle的java代码组件的适用

    三、处理过程

          要搭建kettle的java运行环境首先要引入三个包:kettle-core、kettle-engine、kettle-dbdialog,maven引入方式如下: 

     1         <dependency>
     2             <groupId>pentaho-kettle</groupId>
     3             <artifactId>kettle-core</artifactId>
     4             <version>${kettle.version}</version>
     5         </dependency>
     6 
     7         <dependency>
     8             <groupId>pentaho-kettle</groupId>
     9             <artifactId>kettle-engine</artifactId>
    10             <version>${kettle.version}</version>
    11         </dependency>
    12 
    13         <dependency>
    14             <groupId>pentaho-kettle</groupId>
    15             <artifactId>kettle-dbdialog</artifactId>
    16             <version>${kettle.version}</version>
    17         </dependency>
    View Code

         然后封装ETL的执行代码:

     1 public class KettleUtil {
     2     static {
     3         try {
     4             // 初始化
     5             KettleEnvironment.init();
     6         } catch (Exception e) {
     7             e.printStackTrace();
     8             throw new BusinessBaseException(new BusinessExceptionDesc("etl:100001",BusinessExceptionDesc.SHOW_TYPE.ERROR,"ETL执行异常"));
     9         }
    10     }
    11     public static String runTransfer(Map<String,String> map, String ktrPath) {
    12         Trans trans = null;
    13         String json = null;
    14         try {
    15             // 转换元对象
    16             TransMeta transMeta = new TransMeta(ktrPath);
    17 
    18             // 转换
    19             trans = new Trans(transMeta);
    20 
    21             if(null != map){
    22                 for (Map.Entry<String,String> entry : map.entrySet()){
    23                     trans.setVariable(entry.getKey(),entry.getValue());
    24                 }
    25             }
    26 
    27             // 执行转换
    28             trans.execute(null);
    29 
    30             // 记录最后一个步骤的数据
    31             final List<RowMetaAndData> rows = new ArrayList<RowMetaAndData>();
    32             RowListener rowListner = new RowListener() {
    33                 public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row)
    34                         throws KettleStepException {
    35                     rows.add(new RowMetaAndData(rowMeta, row));
    36                 }
    37 
    38                 public void rowReadEvent(RowMetaInterface arg0, Object[] arg1)
    39                         throws KettleStepException {
    40                 }
    41 
    42                 public void errorRowWrittenEvent(RowMetaInterface arg0,
    43                                                  Object[] arg1) throws KettleStepException {
    44                 }
    45             };
    46 
    47             //获取ETL步数
    48             List<StepMetaDataCombi> steps = trans.getSteps();
    49 
    50             //最后一个步骤名称
    51             String stepname = steps.get(steps.size() - 1).stepname;
    52 
    53             //添加行监听
    54             StepInterface stepInterface = trans.findRunThread(stepname);
    55             stepInterface.addRowListener(rowListner);
    56 
    57             // 等待执行完毕
    58             trans.waitUntilFinished();
    59 
    60             if(!rows.isEmpty()){
    61                 if(rows.size() > 1){
    62                     StringBuffer resultBuf = new StringBuffer();
    63                     for(RowMetaAndData rmad : rows){
    64                         String rowData = rmad.getString("resultData",null);
    65                         if(null != rowData){
    66                             resultBuf.append(rowData).append(",");
    67                         }
    68                     }
    69                     if(resultBuf.length() > 0){
    70                         resultBuf.deleteCharAt(resultBuf.length() - 1);
    71                         json = "["+ resultBuf +"]";
    72                     }
    73                 }else if(rows.size() == 1){
    74                     RowMetaAndData rmad = rows.get(0);
    75                     json = rmad.getString("resultData", null);
    76                 }
    77             }
    78 
    79             // 抛出异常
    80             if (trans.getErrors() > 0) {
    81                 String message = KettleLogStore.getAppender().getBuffer(trans.getLogChannelId(), false).toString();
    82                 throw new BusinessBaseException(new BusinessExceptionDesc("etl:100003",BusinessExceptionDesc.SHOW_TYPE.ERROR,message));
    83             }
    84             return json;
    85         } catch (KettleException e) {
    86             e.printStackTrace();
    87             throw new BusinessBaseException(new BusinessExceptionDesc("etl:100002",BusinessExceptionDesc.SHOW_TYPE.ERROR,"ETL执行异常"));
    88         }
    89     }
    90 }
    View Code

            这个时候启动项目是没有什么问题,当我们用一个单元测试的方式测试时,会发现有好多的类都找不到,很明显的错误就是class not found,按照缺失的类名可以去https://mvnrepository.com/搜索或者去KETTLE的安装目录下的lib面搜索都可以解决问题:

    1 nested exception is java.lang.NoClassDefFoundError: com/google/common/util/concurrent/SettableFuture
    2 java.lang.NoClassDefFoundError: org/mozilla/javascript/JavaScriptException
    View Code
     1 <dependency>
     2             <groupId>com.google.guava</groupId>
     3             <artifactId>guava</artifactId>
     4             <version>18.0</version>
     5         </dependency>
     6 
     7         <dependency>
     8             <groupId>org.apache.commons</groupId>
     9             <artifactId>commons-vfs2</artifactId>
    10             <version>2.1</version>
    11         </dependency>
    12 
    13         <dependency>
    14             <groupId>commons-lang</groupId>
    15             <artifactId>commons-lang</artifactId>
    16             <version>2.6</version>
    17         </dependency>
    18 
    19         <dependency>
    20             <groupId>com.sun.mail</groupId>
    21             <artifactId>javax.mail</artifactId>
    22             <version>1.6.2</version>
    23         </dependency>
    24 
    25         <dependency>
    26             <groupId>com.cite</groupId>
    27             <artifactId>jsonpath</artifactId>
    28             <version>1.0</version>
    29         </dependency>
    30 
    31         <dependency>
    32             <groupId>com.googlecode.json-simple</groupId>
    33             <artifactId>json-simple</artifactId>
    34             <version>1.1</version>
    35         </dependency>
    View Code 

           其中有一个特别的类处理时有个陷阱,JavaScript组件调用时需要,会有一个异常java.lang.NoClassDefFoundError: org/mozilla/javascript/JavaScriptException,从错误很明显可以知道是缺少jar包,两种解决途径确定是那个包,在kettle的安装路径搜索javascript,会发现存在的包并没有包含JavaScriptException类,然后可以去maven仓库搜索,找到的结果是,下面这个包:

    1 dependency>
    2      <groupId>org.mozilla</groupId>
    3       <artifactId>javascript</artifactId>
    4       <version>1.7.2</version>
    5 </dependency>
    View Code

          重新构建项目,然后测试,可以发现ETL是可以执行了的,这里就是坑所在了,看似好了,其实并没有真的好了,在JavaScript组件中如果有JSON和String的转换操作时,代码就会报错:

     1 Caused by: org.mozilla.javascript.EcmaError: ReferenceError: "JSON" is not defined. (script#4)
     2     at org.mozilla.javascript.ScriptRuntime.constructError(ScriptRuntime.java:3654)
     3     at org.mozilla.javascript.ScriptRuntime.constructError(ScriptRuntime.java:3632)
     4     at org.mozilla.javascript.ScriptRuntime.notFoundError(ScriptRuntime.java:3717)
     5     at org.mozilla.javascript.ScriptRuntime.name(ScriptRuntime.java:1692)
     6     at org.mozilla.javascript.gen.c1._c0(script:4)
     7     at org.mozilla.javascript.gen.c1.call(script)
     8     at org.mozilla.javascript.ContextFactory.doTopCall(ContextFactory.java:398)
     9     at org.mozilla.javascript.ScriptRuntime.doTopCall(ScriptRuntime.java:3065)
    10     at org.mozilla.javascript.gen.c1.call(script)
    11     at org.mozilla.javascript.gen.c1.exec(script)
    12     at org.pentaho.di.trans.steps.scriptvalues_mod.ScriptValuesMod.addValues(ScriptValuesMod.java:387)
    View Code

            是不是很无解,ETL在kettle工具中是可以执行的,代码中调用就会报错,其实问题还是出自JavaScript包引入有问题,再次排查kettle安装路径会发现,lib下面有一个js-1.7R3.jar的,命名真的很出奇啊,然后再maven中央仓库中搜索发现并没有这个版本的,本地长传引入重新执行OK了!

    四、java代码组件使用分享

         组件的使用就贴一段数据解密的代码吧,大家自己体会,但有一点要注意哈,组件中使用的类全部需要import,缺失任何一个都会有下面类似的错误:

           java代码组件如下:

     1 import java.security.Key;
     2 import java.security.KeyFactory;
     3 import java.security.PrivateKey;
     4 import java.security.interfaces.RSAKey;
     5 import java.security.interfaces.RSAPrivateKey;
     6 import java.security.interfaces.RSAPublicKey;
     7 import java.security.spec.PKCS8EncodedKeySpec;
     8 import java.security.spec.X509EncodedKeySpec;
     9 import java.util.HashMap;
    10 import java.util.Map;
    11 import java.util.Base64;
    12 import java.io.ByteArrayOutputStream;
    13 import sun.misc.BASE64Decoder;
    14 import javax.crypto.Cipher;
    15 String resultData = null;
    16 public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
    17 {
    18     if (first){
    19       first = false;
    20     }
    21     Object[] r = getRow();
    22     if (r == null) {
    23       setOutputDone();
    24       return false;
    25     }
    26 
    27 
    28     r = createOutputRow(r, data.outputRowMeta.size());
    29     String encryptionData = get(Fields.In, "encryptionData").getString(r);
    30     String key = get(Fields.In, "keyPrivate").getString(r);
    31     try {
    32          byte[] keyBytes = (new BASE64Decoder()).decodeBuffer(key);
    33         PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(keyBytes);
    34         KeyFactory keyFactory = KeyFactory.getInstance("RSA");
    35         PrivateKey privateKey = keyFactory.generatePrivate(keySpec);
    36 
    37         resultData = decryptByPrivateKey(encryptionData,privateKey);
    38     } catch (Exception e) {
    39         throw new KettleException(e);
    40     }
    41 
    42 
    43     get(Fields.Out, "resultData").setValue(r, resultData);
    44     putRow(data.outputRowMeta, r);
    45     return true;
    46 }
    47 public static String decryptByPrivateKey(String data, Key privateKey) {
    48     try {
    49          data = data.replaceAll("%2B", "+");
    50          data = data.replaceAll("%2F", "/");
    51          data = data.replaceAll("%25", "%");
    52          data = data.replaceAll("%3D", "=");
    53          byte[] bytes = Base64.getUrlDecoder().decode(data);
    54          return new String(decryptByPrivateKey(bytes, privateKey));
    55      } catch (Exception e) {
    56          throw new KettleException(e);
    57      }
    58 }
    59 
    60 public static byte[] decryptByPrivateKey(byte[] data, Key privateKey) throws Exception {
    61      try {
    62           Cipher cipher_d = Cipher.getInstance("RSA");
    63           cipher_d.init(Cipher.DECRYPT_MODE, privateKey);
    64 
    65           //模长
    66           int key_len = ((RSAKey) privateKey).getModulus().bitLength() / 8;
    67 
    68           ByteArrayOutputStream bout = new ByteArrayOutputStream(cipher_d.getOutputSize(data.length)); //输出缓冲区
    69 
    70           //分块加密
    71           for (int i = 0; i < (data.length + key_len - 1) / key_len; i++) {
    72               bout.write(cipher_d.doFinal(data, i * key_len,
    73               Math.min(key_len,data.length - (i * key_len))));
    74           }
    75           return bout.toByteArray();
    76       } catch (Exception e) {
    77           throw new Exception("解密失败", e);
    78       }
    79 }
    View Code
  • 相关阅读:
    8 Range 对象
    7 Worksheet 对象
    6 Workbook 对象
    5 Application 对象
    Windows路径
    windows 下操作目录(使用DOS命令)
    Windows 批处理
    6 WPF控件
    Lexer and parser generators (ocamllex, ocamlyacc)
    4.9 Parser Generators
  • 原文地址:https://www.cnblogs.com/cymiao/p/12559397.html
Copyright © 2011-2022 走看看