zoukankan      html  css  js  c++  java
  • Kudu基本操作及概念

    Kudu:
        针对 Apache Hadoop 平台而开发的列式存储管理器。

    使用场景:
        适用于那些既有随机访问,也有批量数据扫描的复合场景。
        高计算量的场景。
        使用了高性能的存储设备,包括使用更多的内存。
        支持数据更新,避免数据反复迁移。
        支持跨地域的实时数据备份和查询。
        
    kudu的关键机制:
    1.模仿数据库,以二维表的形式组织数据,创建表的时候需要指定schema。所以只支持结构化数据。

    2.每个表指定一个或多个主键。

    3.支持insert/update/delete,这些修改操作全部要指定主键。

    4.read操作,只支持scan原语。

    5.一致性模型,默认支持snapshot ,这个可以保证scan和单个客户端 read-you-writes一致性保证。更强的一致性保证,提供manually propagate timestamps between clients或者commit-wait。

    6.cluster类似hbase简单的M-S结构,master支持备份。

    7.单个表支持水平分割,partitions叫tablets,单行一定在一个tablets里面,支持范围,以及list等更灵活的分区键。

    8.使用Raft 协议,可以根据SLA指定备份块数量。

    9.列式存储

    10.delta flushes,数据先更新到内存中,最后在合并到最终存储中,有专门到后台进程负责。

    11.Lazy Materialization ,对一些选择性谓词,可以帮助跳过很多不必要的数据。

    12.支持和MR/SPARK/IMPALA等集成,支持Locality ,Columnar Projection ,Predicate pushdown 等。


    注意:
    1、建表的时候要求所有的tserver节点都活着
    2、根据raft机制,允许(replication的副本数-)/ 2宕掉,集群还会正常运行,否则会报错找不到ip:7050(7050是rpc的通信端口号),需要注意一个问题,第一次运行的时候要保证集群处于正常状态下,也就是所有的服务都启动,如果运行过程中,允许(replication的副本数-)/ 2宕掉
    3、读操作,只要有一台活着的情况下,就可以运行



    maven 依赖:

            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-spark2_2.11</artifactId>
                <version>1.7.0</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-client</artifactId>
                <version>1.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-client-tools</artifactId>
                <version>1.7.0</version>
            </dependency>





    Java 代码:

    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Schema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.*;
    import org.apache.kudu.spark.kudu.KuduContext;
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @Author:Xavier
     * @Data:2019-02-22 09:25
     **/
    
    
    public class KuduOption {
        // master地址
        private static final String KUDU_MASTER = "nn02:7051";
    
        private static String tableName = "KuduTest";
    
        //创建表
        @Test
        public void CreateTab() {
            // 创建kudu的数据库链接
            KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
    
            try {
                // 设置表的schema(模式)
                List<ColumnSchema> columns = new ArrayList(2);
                columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
                columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
                Schema schema = new Schema(columns);
    
                //创建表时提供的所有选项
                CreateTableOptions options = new CreateTableOptions();
    
                // 设置表的replica备份和分区规则
                List<String> rangeKeys = new ArrayList<>();
                rangeKeys.add("key");
    
                // 一个replica
                options.setNumReplicas(1);
                // 用列rangeKeys做为分区的参照
                options.setRangePartitionColumns(rangeKeys);
                client.createTable(tableName, schema, options);
    
                // 添加key的hash分区
                //options.addHashPartitions(parcols, 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    client.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        //向表内插入新数据
        @Test
        public void InsertData() {
            // 创建kudu的数据库链接
            KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
            try {
                // 打开表
                KuduTable table = client.openTable(tableName);
                // 创建写session,kudu必须通过session写入
                KuduSession session = client.newSession();
    
                // 采取Flush方式 手动刷新
                session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
                session.setMutationBufferSpace(3000);
    
                System.out.println("-------start--------" + System.currentTimeMillis());
    
                for (int i = 1; i < 6100; i++) {
                    Insert insert = table.newInsert();
                    // 设置字段内容
                    PartialRow row = insert.getRow();
                    row.addString("key", i+"");
                    row.addString(1, "value"+i);
                    session.flush();
                    session.apply(insert);
                }
                System.out.println("-------end--------" + System.currentTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
    
            } finally {
                try {
                    client.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        //更新数据
        @Test
        public void kuduUpdateTest() {
            // 创建kudu的数据库链接
            KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
            try {
                KuduTable table = client.openTable(tableName);
                KuduSession session = client.newSession();
    
                Update update = table.newUpdate();
                PartialRow row = update.getRow();
    
                //
                row.addString("key", 998 + "");
                row.addString("value", "updata Data " + 10);
                session.flush();
                session.apply(update);
    
    //            System.out.print(operationResponse.getRowError());
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    client.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        //根据主键删除数据
        @Test
        public void deleteData(){
            KuduClient client=new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
            try {
                KuduTable table=client.openTable(tableName);
                KuduSession session=client.newSession();
    
                Delete delete=table.newDelete();
                PartialRow row=delete.getRow();
                row.addString("key","992");
    
                session.apply(delete);
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }
    
        //扫描数据
        @Test
        public void SearchData() {
            // 创建kudu的数据库链接
            KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
    
            try {
                KuduTable table = client.openTable(tableName);
    
                List<String> projectColumns = new ArrayList<>(1);
                projectColumns.add("value");
                KuduScanner scanner = client.newScannerBuilder(table)
                        .setProjectedColumnNames(projectColumns)
                        .build();
                while (scanner.hasMoreRows()) {
                    RowResultIterator results = scanner.nextRows();
                    while (results.hasNext()) {
                        RowResult result = results.next();
                        System.out.println(result.getString(0));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    client.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        //条件扫描数据
        @Test
        public void searchDataByCondition(){
            KuduClient client =new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
    
            try {
                KuduTable table=client.openTable(tableName);
    
                KuduScanner.KuduScannerBuilder scannerBuilder=client.newScannerBuilder(table);
    
                //设置搜索的条件
                KuduPredicate predicate=KuduPredicate.
                        newComparisonPredicate(
                                table.getSchema().getColumn("key"),//设置要值的谓词(字段)
                                KuduPredicate.ComparisonOp.EQUAL,//设置搜索逻辑
                                "991");//设置搜索条件值
                scannerBuilder.addPredicate(predicate);
    
                // 开始扫描
                KuduScanner scanner=scannerBuilder.build();
                while(scanner.hasMoreRows()){
                    RowResultIterator iterator=scanner.nextRows();
                    while(iterator.hasNext()){
                        RowResult result=iterator.next();
                        System.out.println("输出: "+result.getString(0)+"--"+result.getString("value"));
                    }
                }
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }
    
        //删除表
        @Test
        public void DelTab() {
            KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
            try {
                client.deleteTable(tableName);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    client.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        //
        @Test
        public void searchBysparkSql() {
            SparkSession sparkSession = getSparkSession();
            List<StructField> fields = Arrays.asList(
                    DataTypes.createStructField("key", DataTypes.StringType, true),
                    DataTypes.createStructField("value", DataTypes.StringType, true));
            StructType schema = DataTypes.createStructType(fields);
            Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu").
                    schema(schema).option("kudu.master", "nn02:7051").option("kudu.table", "KuduTest").load();
            ds.registerTempTable("abc");
            sparkSession.sql("select * from abc").show();
        }
    
        @Test
        public void checkTableExistByKuduContext() {
            SparkSession sparkSession = getSparkSession();
            KuduContext context = new KuduContext("172.19.224.213:7051", sparkSession.sparkContext());
            System.out.println(tableName + " is exist = " + context.tableExists(tableName));
        }
    
        public SparkSession getSparkSession() {
            SparkConf conf = new SparkConf().setAppName("test")
                    .setMaster("local[*]")
                    .set("spark.driver.userClassPathFirst", "true");
    
            conf.set("spark.sql.crossJoin.enabled", "true");
            SparkContext sparkContext = new SparkContext(conf);
            SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate();
            return sparkSession;
        }
    }





  • 相关阅读:
    LeetCode 83. Remove Duplicates from Sorted List (从有序链表中去除重复项)
    LeetCode 21. Merge Two Sorted Lists (合并两个有序链表)
    LeetCode 720. Longest Word in Dictionary (字典里最长的单词)
    LeetCode 690. Employee Importance (职员的重要值)
    LeetCode 645. Set Mismatch (集合不匹配)
    LeetCode 500. Keyboard Row (键盘行)
    LeetCode 463. Island Perimeter (岛的周长)
    115.Distinct Subsequences
    55.Jump Game
    124.Binary Tree Maximum Path Sum
  • 原文地址:https://www.cnblogs.com/xavier-xd/p/10417805.html
Copyright © 2011-2022 走看看