参考官网:http://kudu.apache.org/docs/kudu_impala_integration.html
参考:https://my.oschina.net/weiqingbin/blog/189413#OSC_h2_8
参考:https://www.cloudera.com/documentation/enterprise/5-12-x/topics/impala_create_table.html
1、创建表
CREATE TABLE my_first ( id STRING, int_value INT, bigint_value BIGINT, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 3 STORED AS KUDU;
需要注意的是 PARTITION BY HASH PARTITIONS 3,3这个数字需要和实际的物理机器有关,不能大于实际的物理机器数,这个很令人费解,还没有找到原因。
java测试代码
package main.java; import java.sql.Timestamp; import java.util.List; import java.util.UUID; import org.apache.kudu.client.*; /** * @Author:sks * @Description: * @Date:Created in 14:59 2018/3/29 * @Modified by: **/ import java.util.Date; public class test { private final static int mutationBufferSpace = 2000; public static void main(String[] args) { try { testInsert(); } catch (KuduException e) { e.printStackTrace(); } } public static void testInsert() throws KuduException { String master = "192.163.1.175:7051"; KuduSession session =null; // String master = "101.201.197.71:7051"; try { KuduClient client = new KuduClient.KuduClientBuilder(master) .build(); session = client.newSession(); String tableName = "impala::default.my_first"; KuduTable table = client.openTable(tableName); SessionConfiguration.FlushMode mode; Timestamp d1 = null; Timestamp d2 = null; long millis; long seconds; int recordCount = 1000; mode = SessionConfiguration.FlushMode.MANUAL_FLUSH; d1 = new Timestamp(System.currentTimeMillis()); insertManual(session, table, recordCount); d2 = new Timestamp(System.currentTimeMillis()); millis = d2.getTime() - d1.getTime(); seconds = millis / 1000 % 60; System.out.println(mode.name() + "耗时秒数:" + seconds); } catch (KuduException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { if (!session.isClosed()) { session.close(); } } } //仅支持手动flush的测试用例 public static void insertManual(KuduSession session, KuduTable table, int recordCount) throws Exception { SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH; session.setFlushMode(mode); //如果不设置,默认是1000 session.setMutationBufferSpace(mutationBufferSpace); int uncommit = 0; for (int i = 0; i < recordCount; i++) { System.out.println(i); Insert insert = table.newInsert(); PartialRow row = insert.getRow(); UUID uuid = UUID.randomUUID(); row.addString("id", uuid.toString()); row.addInt("int_value", i); row.addLong("bigint_value", 10000L); session.apply(insert); // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交 // uncommit = uncommit + 1; // if (uncommit > OPERATION_BATCH / 2) { // session.flush(); // uncommit = 0; // } } // 对于手工提交, 保证完成最后的提交 if (uncommit > 0) { List<OperationResponse> lst = session.flush(); for(OperationResponse oor :lst){ System.out.println(oor.getRowError()); } } } //仅支持自动flush的测试用例 public static void insertInAutoSync(KuduSession session, KuduTable table, int recordCount) throws Exception { // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC // SessionConfiguration.FlushMode.MANUAL_FLUSH SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC; //如果不设置,默认是1000 session.setFlushMode(mode); for (int i = 0; i < recordCount; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); UUID uuid = UUID.randomUUID(); row.addString("id", uuid.toString()); row.addInt("int_value", 100); row.addLong("bigint_value", 10000L); session.apply(insert); } } }
maven配置
<dependencies> <dependency> <groupId>net.sourceforge.javacsv</groupId> <artifactId>javacsv</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.17</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20180813</version> </dependency> <dependency> <groupId>org.glassfish.jersey.containers</groupId> <artifactId>jersey-container-servlet</artifactId> <version>2.9</version> </dependency> <dependency> <groupId>org.glassfish.jersey.core</groupId> <artifactId>jersey-client</artifactId> <version>2.9</version> </dependency> <dependency> <groupId>org.glassfish.jersey.media</groupId> <artifactId>jersey-media-json-jackson</artifactId> <version>2.9</version> </dependency> </dependencies>