Kudu:
针对 Apache Hadoop 平台而开发的列式存储管理器。
使用场景:
适用于那些既有随机访问,也有批量数据扫描的复合场景。
高计算量的场景。
使用了高性能的存储设备,包括使用更多的内存。
支持数据更新,避免数据反复迁移。
支持跨地域的实时数据备份和查询。
kudu的关键机制:
1.模仿数据库,以二维表的形式组织数据,创建表的时候需要指定schema。所以只支持结构化数据。
2.每个表指定一个或多个主键。
3.支持insert/update/delete,这些修改操作全部要指定主键。
4.read操作,只支持scan原语。
5.一致性模型,默认支持snapshot ,这个可以保证scan和单个客户端 read-you-writes一致性保证。更强的一致性保证,提供manually propagate timestamps between clients或者commit-wait。
6.cluster类似hbase简单的M-S结构,master支持备份。
7.单个表支持水平分割,partitions叫tablets,单行一定在一个tablets里面,支持范围,以及list等更灵活的分区键。
8.使用Raft 协议,可以根据SLA指定备份块数量。
9.列式存储
10.delta flushes,数据先更新到内存中,最后在合并到最终存储中,有专门到后台进程负责。
11.Lazy Materialization ,对一些选择性谓词,可以帮助跳过很多不必要的数据。
12.支持和MR/SPARK/IMPALA等集成,支持Locality ,Columnar Projection ,Predicate pushdown 等。
注意:
1、建表的时候要求所有的tserver节点都活着
2、根据raft机制,允许(replication的副本数-)/ 2宕掉,集群还会正常运行,否则会报错找不到ip:7050(7050是rpc的通信端口号),需要注意一个问题,第一次运行的时候要保证集群处于正常状态下,也就是所有的服务都启动,如果运行过程中,允许(replication的副本数-)/ 2宕掉
3、读操作,只要有一台活着的情况下,就可以运行
maven 依赖:
<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>1.7.0</version> </dependency>
Java 代码:
import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import org.apache.kudu.spark.kudu.KuduContext; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * @Author:Xavier * @Data:2019-02-22 09:25 **/ public class KuduOption { // master地址 private static final String KUDU_MASTER = "nn02:7051"; private static String tableName = "KuduTest"; //创建表 @Test public void CreateTab() { // 创建kudu的数据库链接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { // 设置表的schema(模式) List<ColumnSchema> columns = new ArrayList(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build()); Schema schema = new Schema(columns); //创建表时提供的所有选项 CreateTableOptions options = new CreateTableOptions(); // 设置表的replica备份和分区规则 List<String> rangeKeys = new ArrayList<>(); rangeKeys.add("key"); // 一个replica options.setNumReplicas(1); // 用列rangeKeys做为分区的参照 options.setRangePartitionColumns(rangeKeys); client.createTable(tableName, schema, options); // 添加key的hash分区 //options.addHashPartitions(parcols, 3); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //向表内插入新数据 @Test public void InsertData() { // 创建kudu的数据库链接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { // 打开表 KuduTable table = client.openTable(tableName); // 创建写session,kudu必须通过session写入 KuduSession session = client.newSession(); // 采取Flush方式 手动刷新 session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); session.setMutationBufferSpace(3000); System.out.println("-------start--------" + System.currentTimeMillis()); for (int i = 1; i < 6100; i++) { Insert insert = table.newInsert(); // 设置字段内容 PartialRow row = insert.getRow(); row.addString("key", i+""); row.addString(1, "value"+i); session.flush(); session.apply(insert); } System.out.println("-------end--------" + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //更新数据 @Test public void kuduUpdateTest() { // 创建kudu的数据库链接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); KuduSession session = client.newSession(); Update update = table.newUpdate(); PartialRow row = update.getRow(); // row.addString("key", 998 + ""); row.addString("value", "updata Data " + 10); session.flush(); session.apply(update); // System.out.print(operationResponse.getRowError()); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //根据主键删除数据 @Test public void deleteData(){ KuduClient client=new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table=client.openTable(tableName); KuduSession session=client.newSession(); Delete delete=table.newDelete(); PartialRow row=delete.getRow(); row.addString("key","992"); session.apply(delete); } catch (KuduException e) { e.printStackTrace(); } } //扫描数据 @Test public void SearchData() { // 创建kudu的数据库链接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); List<String> projectColumns = new ArrayList<>(1); projectColumns.add("value"); KuduScanner scanner = client.newScannerBuilder(table) .setProjectedColumnNames(projectColumns) .build(); while (scanner.hasMoreRows()) { RowResultIterator results = scanner.nextRows(); while (results.hasNext()) { RowResult result = results.next(); System.out.println(result.getString(0)); } } } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //条件扫描数据 @Test public void searchDataByCondition(){ KuduClient client =new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table=client.openTable(tableName); KuduScanner.KuduScannerBuilder scannerBuilder=client.newScannerBuilder(table); //设置搜索的条件 KuduPredicate predicate=KuduPredicate. newComparisonPredicate( table.getSchema().getColumn("key"),//设置要值的谓词(字段) KuduPredicate.ComparisonOp.EQUAL,//设置搜索逻辑 "991");//设置搜索条件值 scannerBuilder.addPredicate(predicate); // 开始扫描 KuduScanner scanner=scannerBuilder.build(); while(scanner.hasMoreRows()){ RowResultIterator iterator=scanner.nextRows(); while(iterator.hasNext()){ RowResult result=iterator.next(); System.out.println("输出: "+result.getString(0)+"--"+result.getString("value")); } } } catch (KuduException e) { e.printStackTrace(); } } //删除表 @Test public void DelTab() { KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { client.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } // @Test public void searchBysparkSql() { SparkSession sparkSession = getSparkSession(); List<StructField> fields = Arrays.asList( DataTypes.createStructField("key", DataTypes.StringType, true), DataTypes.createStructField("value", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu"). schema(schema).option("kudu.master", "nn02:7051").option("kudu.table", "KuduTest").load(); ds.registerTempTable("abc"); sparkSession.sql("select * from abc").show(); } @Test public void checkTableExistByKuduContext() { SparkSession sparkSession = getSparkSession(); KuduContext context = new KuduContext("172.19.224.213:7051", sparkSession.sparkContext()); System.out.println(tableName + " is exist = " + context.tableExists(tableName)); } public SparkSession getSparkSession() { SparkConf conf = new SparkConf().setAppName("test") .setMaster("local[*]") .set("spark.driver.userClassPathFirst", "true"); conf.set("spark.sql.crossJoin.enabled", "true"); SparkContext sparkContext = new SparkContext(conf); SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate(); return sparkSession; } }