zoukankan      html  css  js  c++  java
  • kudu系列: Java API使用和效率测试

    Kudu+Impala很适合数据分析, 但直接使用Insert values语句往Kudu表插入数据, 效率实在不好, 测试下来insert的速度仅为80笔/秒. 原因也是显然的, Kudu本身写入效率很高, 但是Impala并没有做这方面优化, 观察下来每次Impala语句执行的overhead都太大了, 导致频繁小批次写入效率非常差, Kudu官方推荐使用Java API或Python API完成数据写入工作. 下面是使用Java API的测试用例, 也可以看出Kudu API的大致用法. 

    =========================
    准备测试Table
    =========================

    -- kudu table
    CREATE TABLE kudu_testdb.tmp_test_perf
    (
    id string ENCODING PLAIN_ENCODING COMPRESSION SNAPPY,
    int_value int ,
    bigint_value bigint ,
    timestamp_value timestamp ,
    boolean_value int,
    PRIMARY KEY (id)
    )
    PARTITION BY HASH (id) PARTITIONS 6
    STORED AS KUDU
    TBLPROPERTIES (
    'kudu.table_name' = 'testdb.tmp_test_perf',
    'kudu.master_addresses' = '10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051',
    'kudu.num_tablet_replicas' = '1'
    )
    ;

    =========================
    编写测试java程序
    =========================
    Kudu API 编码注意事项:

    1. 尽管建表Impala DDL中,kudu表字段名大小写不敏感, 但在kudu层面, 字段名称已经转成为小写形式, 在Kudu API中, 字段名称必须是小写字母.
    2. 建表Impala DDL表名称大小写会被完整地保留下来, 并没有被转成小写, 而且在Kudu API使用中, 表名是大小写敏感的, 必须和建表DDL完全一致.
    3. Kudu API给字段赋值函数是不接受传入null, 所以如果在为字段赋值之前, 最好先判断一下取值是否为null. 例如下面两行代码会报错. 

    Long longTmp=null;
    row.addLong("bigint_value",longTmp);

     
    package kudu_perf_test;
    
    import java.sql.Timestamp;
    import java.util.UUID;
    import org.apache.kudu.client.*;
    
    public class Test {
        private final static int OPERATION_BATCH = 500;
        
        //同时支持三个模式的测试用例  
        public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode,
                int recordCount) throws Exception {
            // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
            // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
            // SessionConfiguration.FlushMode.MANUAL_FLUSH
            session.setFlushMode(mode);
            if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) {
                session.setMutationBufferSpace(OPERATION_BATCH);
            }
            int uncommit = 0;
    
            for (int i = 0; i < recordCount; i++) {
                Insert insert = table.newInsert();
                PartialRow row = insert.getRow();
                UUID uuid = UUID.randomUUID();
                row.addString("id", uuid.toString());
                row.addInt("int_value", 100);
                row.addLong("bigint_value", 10000L);
                
                
                Long gtmMillis;
                /* System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要*1000
                 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
                 */
                
                //方法1: 获取当前时间对应的GTM时区unix毫秒数
                gtmMillis=System.currentTimeMillis(); 
                
                
                //方法2: 将timestamp转成对应的GTM时区unix毫秒数
                Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
                gtmMillis=localTimestamp.getTime();   
                
                //将GTM的毫秒数转成东8区的毫秒数量
                Long shanghaiTimezoneMillis=gtmMillis+8*3600*1000;
                row.addLong("timestamp_value", shanghaiTimezoneMillis*1000);
                
                session.apply(insert);
    
                // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
                if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) {
                    uncommit = uncommit + 1;
                    if (uncommit > OPERATION_BATCH / 2) {
                        session.flush();
                        uncommit = 0;
                    }
                }
            }
    
            // 对于手工提交, 保证完成最后的提交
            if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && uncommit > 0) {
                session.flush();
            }
    
            // 对于后台自动提交, 必须保证完成最后的提交, 并保证有错误时能抛出异常
            if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) {
                session.flush();  
                RowErrorsAndOverflowStatus error = session.getPendingErrors();
                if (error.isOverflowed() || error.getRowErrors().length > 0) {
                    if (error.isOverflowed()) {
                        throw new Exception("Kudu overflow exception occurred.");
                    }
                    StringBuilder errorMessage = new StringBuilder();
                    if (error.getRowErrors().length > 0) {
                        for (RowError errorObj : error.getRowErrors()) {
                            errorMessage.append(errorObj.toString());
                            errorMessage.append(";");
                        }
                    }
                    throw new Exception(errorMessage.toString());
                }
            }
    
        }
    
        //仅支持手动flush的测试用例  
        public static void insertTestManual(KuduSession session, KuduTable table, int recordCount) throws Exception {
            // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
            // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
            // SessionConfiguration.FlushMode.MANUAL_FLUSH
            SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
            session.setFlushMode(mode);
            session.setMutationBufferSpace(OPERATION_BATCH);
    
            int uncommit = 0;
            for (int i = 0; i < recordCount; i++) {
                Insert insert = table.newInsert();
                PartialRow row = insert.getRow();
                UUID uuid = UUID.randomUUID();
                row.addString("id", uuid.toString());
                row.addInt("int_value", 100);
                row.addLong("bigint_value", 10000L);
                
                
                Long gtmMillis;
                /* System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要*1000
                 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
                 */
                
                //方法1: 获取当前时间对应的GTM时区unix毫秒数
                gtmMillis=System.currentTimeMillis(); 
                
                
                //方法2: 将timestamp转成对应的GTM时区unix毫秒数
                Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
                gtmMillis=localTimestamp.getTime();   
                
                //将GTM的毫秒数转成东8区的毫秒数量
                Long shanghaiTimezoneMillis=gtmMillis+8*3600*1000;
                row.addLong("timestamp_value", shanghaiTimezoneMillis*1000);
    
                session.apply(insert);
                
                // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
                uncommit = uncommit + 1;
                if (uncommit > OPERATION_BATCH / 2) {
                    session.flush();
                    uncommit = 0;
                }
            }
    
            // 对于手工提交, 保证完成最后的提交
            if (uncommit > 0) {
                session.flush();
            }
        }
       
        //仅支持自动flush的测试用例
        public static void insertTestInAutoSync(KuduSession session, KuduTable table, int recordCount) throws Exception {
            // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
            // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
            // SessionConfiguration.FlushMode.MANUAL_FLUSH
            SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
            session.setFlushMode(mode);        
    
            for (int i = 0; i < recordCount; i++) {
                Insert insert = table.newInsert();
                PartialRow row = insert.getRow();
                UUID uuid = UUID.randomUUID();
                row.addString("id", uuid.toString());
                row.addInt("int_value", 100);
                row.addLong("bigint_value", 10000L);
                
                
                Long gtmMillis;
                /* System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要*1000
                 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
                 */
                
                //方法1: 获取当前时间对应的GTM时区unix毫秒数
                gtmMillis=System.currentTimeMillis(); 
                
                
                //方法2: 将timestamp转成对应的GTM时区unix毫秒数
                Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
                gtmMillis=localTimestamp.getTime();   
                
                //将GTM的毫秒数转成东8区的毫秒数量
                Long shanghaiTimezoneMillis=gtmMillis+8*3600*1000;
                row.addLong("timestamp_value", shanghaiTimezoneMillis*1000);
                
                //对于AUTO_FLUSH_SYNC模式, apply()将立即完成kudu写入
                session.apply(insert);
            }
        }
    
        public static void test() throws KuduException {
            KuduClient client = new KuduClient.KuduClientBuilder("10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051")
                    .build();
            KuduSession session = client.newSession();
            KuduTable table = client.openTable("testdb.tmp_test_perf");
    
            SessionConfiguration.FlushMode mode;
            Timestamp d1 = null;
            Timestamp d2 = null;
            long millis;
            long seconds;
            int recordCount = 0;
    
            try {
                mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
                d1 = new Timestamp(System.currentTimeMillis());
                insertTestGeneric(session, table, mode, recordCount);
                d2 = new Timestamp(System.currentTimeMillis());
                millis = d2.getTime() - d1.getTime();
                seconds = millis / 1000 % 60;
                System.out.println(mode.name() + "耗时秒数:" + seconds);
    
                mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;            
                d1 = new Timestamp(System.currentTimeMillis());
                insertTestInAutoSync(session, table,  recordCount);
                d2 = new Timestamp(System.currentTimeMillis());
                millis = d2.getTime() - d1.getTime();
                seconds = millis / 1000 % 60;
                System.out.println(mode.name() + "耗时秒数:" + seconds);
                
                mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;                    
                d1 = new Timestamp(System.currentTimeMillis());
                insertTestManual(session, table,  recordCount);
                d2 = new Timestamp(System.currentTimeMillis());
                millis = d2.getTime() - d1.getTime();
                seconds = millis / 1000 % 60;
                System.out.println(mode.name() + "耗时秒数:" + seconds);            
                
    
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (!session.isClosed()) {
                    session.close();
                }
            }
    
        }
    
        public static void main(String[] args) {
            try {
                test();
            } catch (KuduException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("Done");
    
        }
    }

    =========================
    性能测试结果
    =========================
    MANUAL_FLUSH 模式:8000 row/second
    AUTO_FLUSH_BACKGROUND 模式:8000 row/second
    AUTO_FLUSH_SYNC 模式:1000 row/second
    Impala SQL Insert 语句:80 row/second

    =========================
    Kudu API 使用总结
    =========================
    1. 尽量采用 MANUAL_FLUSH, 性能最好, 如果有写入kudu错误, flush()函数就会抛出异常, 逻辑非常清晰.
    2. 在性能要求不高的情况下, AUTO_FLUSH_SYNC 也是一个好的选择.
    3. 仅仅在demo场景下使用 AUTO_FLUSH_BACKGROUND, 不考虑异常处理时候代码可以很简单, 性能也很好. 在生产环境下, 不推荐的 原因是: 插入数据可能会是乱序, 一旦考虑捕获异常代码就很拖沓.

  • 相关阅读:
    循序渐进Python3(二) -- 数据类型
    循序渐进Python3(一)-- 初识Python
    给变量赋值 字符串 和 列表(字典)时的区别
    tensorflow函数解析:Session.run和Tensor.eval
    Tensorflow函数——tf.set_random_seed(seed)
    windows下安装mongodb数据库以及使用数据库
    django中安装pillow ValueError: zlib is required unless explicitly disabled using --disable-zlib, aborting
    Django中常用的基本命令
    CSRF verification failed. Request aborted.
    django报错TypeRError:__init__() missing 1 required positional argument: 'on_delete'
  • 原文地址:https://www.cnblogs.com/harrychinese/p/kudu_java_api.html
Copyright © 2011-2022 走看看