zoukankan      html  css  js  c++  java
  • Kettle系列:使用Kudu API插入数据到Kudu中

    本文详细介绍了在Kettle中使用 Kudu API将数据写入Kudu中, 从本文可以学习到:
    1. 如何编写一个简单的 Kettle 的 Used defined Java class.
    2. 如何读取Kettle 每个记录的字段. 需要注意的是 getInteger() 返回的是Long 对象; 而获取 Timestamp 字段的方法是getDate().
    3. 如何调用Kudu API.

    本Kettle示例非常简单, Data Grid 组件定义一些sample data(包含多种数据类型), Java class将这些sample data写入kudu. 

    Kudu表schema:

    CREATE TABLE kudu_testdb.perf_test_t1
    ( 
        id string ENCODING PLAIN_ENCODING COMPRESSION SNAPPY,
        int_value int,
        bigint_value bigint, 
        timestamp_value timestamp, 
        bool_value int,
        PRIMARY KEY (histdate,id)  
    ) 
    PARTITION BY HASH (histdate,id) PARTITIONS 2 
    STORED AS KUDU
    TBLPROPERTIES (
      'kudu.table_name' = 'testdb.perf_test_t1',
      'kudu.master_addresses' = '10.205.6.1:7051,10.205.6.2:7051,10.205.7.3:7051'
    );

    重点看Java class 代码:

    import java.sql.Timestamp;
    import java.util.UUID;
    import static java.lang.Math.toIntExact;
    
    import org.apache.kudu.client.Insert;
    import org.apache.kudu.client.KuduClient;
    import org.apache.kudu.client.KuduException;
    import org.apache.kudu.client.KuduSession;
    import org.apache.kudu.client.KuduTable;
    import org.apache.kudu.client.PartialRow;
    import org.apache.kudu.client.SessionConfiguration;
    
    private final static String KUDU_TABLE="testdb.perf_test_t1";
    private final static String KUDU_SERVERS="10.205.6.1:7051,10.205.6.2:7051,10.205.7.3:7051";
    private final static int OPERATION_BATCH = 50;  
    
    KuduClient client=null;
    KuduSession session=null;
    KuduTable table=null;
    Integer recordCount=null;
    SessionConfiguration.FlushMode mode;
    
    private Object[] previousRow;
    private Object[] currentRow;
    
    
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
      if (first) {
        first = false; 
      }
    
      currentRow = getRow();
      if (currentRow == null) {
        setOutputDone();
        return false;
      }
     
        try {
            session.setFlushMode(mode);
            session.setMutationBufferSpace(OPERATION_BATCH);
    
            int uncommit = 0;
            while(currentRow != null) {
                Insert insert = table.newInsert();
                PartialRow kuduRow = insert.getRow();
                           
                int intTmp;
                Long longTmp;
                String stringTmp;
                java.util.Date dateTmp;
                Boolean booleanTmp;
                
                    
                // kettle string -> kudu string 
                //kuduRow.addString("id",UUID.randomUUID().toString());
                stringTmp =  get(Fields.In, "id").getString(currentRow);    
                if (stringTmp!=null)
                {
                   kuduRow.addString("id",stringTmp);
                }
                
                // kettle int -> kudu int
                //import static java.lang.Math.toIntExact;
                longTmp=get(Fields.In, "int_value").getInteger(currentRow);
                if (longTmp!=null)
                {
                   intTmp =toIntExact(get(Fields.In, "int_value").getInteger(currentRow)); 
                   kuduRow.addInt("int_value",  intTmp);
                } 
                
    
                // kettle bigint -> kudu bigint 
                longTmp=get(Fields.In, "bigint_value").getInteger(currentRow);
                if (longTmp!=null)
                { 
                     kuduRow.addLong("bigint_value", longTmp);
                 }
     
                // kettle date/timestamp  -> kudu timestamp 
                dateTmp= get(Fields.In, "timestamp_value").getDate(currentRow);
                if (dateTmp!=null)
                { 
                   longTmp=dateTmp.getTime()+8*3600*1000; //转到东8区时间
                   kuduRow.addLong("timestamp_value", longTmp*1000);             
                } 
        
     
                // kettle boolean  -> kudu int
                booleanTmp= get(Fields.In, "boolean_value").getBoolean(currentRow);
                if (booleanTmp!=null)
                { 
                   intTmp=0;
                   if (booleanTmp)
                    {intTmp=1;}         
                   kuduRow.addInt("boolean_value", intTmp);             
                } 
               
                // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
                 uncommit = uncommit + 1;
                    if (uncommit > OPERATION_BATCH / 2) {
                        session.flush();
                         uncommit = 0;
                    }
                session.apply(insert);
                previousRow=currentRow;
                currentRow=getRow();
            }
    
            // 对于手工提交, 保证完成最后的提交
            if (uncommit > 0) {
                session.flush();
            }
    
           } catch (Exception e) {
            e.printStackTrace();
            throw e; 
         }
    
      // Send the row on to the next step.
      //putRow(data.outputRowMeta, currentRow);
    
      return false;
    }
    
    public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {
      try {
             client = new KuduClient.KuduClientBuilder(KUDU_SERVERS).build();
             session = client.newSession();      
             table =client.openTable(KUDU_TABLE);
             mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
           } catch (Exception e) {
            e.printStackTrace();
            throw e; 
         }
    
      return parent.initImpl(stepMetaInterface, stepDataInterface);
    }
    
    public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
        try {
                if (!session.isClosed()) {
                    session.close();
                }
           } catch (Exception e) {
            e.printStackTrace();
            throw e; 
         }
      parent.disposeImpl(smi, sdi);
    }
     
  • 相关阅读:
    Redis 数据类型
    python的图形化界面
    python文件操作
    持续集成(Continuous Integration)
    MySQL理解索引、添加索引的原则
    Perl中的字符串操作函数
    PHP常用函数大全
    Javascript 中 Array的 sort()和 compare()方法
    使用Composer管理PHP依赖关系
    一致性 Hash 算法
  • 原文地址:https://www.cnblogs.com/harrychinese/p/kettle_insert_data_into_kudu.html
Copyright © 2011-2022 走看看