zoukankan      html  css  js  c++  java
  • 使用spark集成kudu做DDL

    spark对kudu表的创建

    定义kudu的表需要分成5个步骤:

    1:提供表名

    2:提供schema

    3:提供主键

    4:定义重要选项;例如:定义分区的schema

    5:调用create Table api

    import org.apache.kudu.client.CreateTableOptions
    import org.apache.kudu.spark.kudu._
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    import collection.JavaConverters._
    /**
      * Created by angel;
      */
    object CURD {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("AcctfileProcess")
          //设置Master_IP并设置spark参数
          .setMaster("local")
          .set("spark.worker.timeout", "500")
          .set("spark.cores.max", "10")
          .set("spark.rpc.askTimeout", "600s")
          .set("spark.network.timeout", "600s")
          .set("spark.task.maxFailures", "1")
          .set("spark.speculationfalse", "false")
          .set("spark.driver.allowMultipleContexts", "true")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkContext = SparkContext.getOrCreate(sparkConf)
        val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
        //使用spark创建kudu表
        val kuduContext = new KuduContext("hadoop01:7051,hadoop02:7051,hadoop03:7051", sqlContext.sparkContext)
        //TODO 1:定义表名
        val kuduTableName = "spark_kudu_tbl"
        //TODO 2:定义schema
        val schema = StructType(
            StructField("CompanyId", StringType, false) ::
            StructField("name", StringType, false) ::
            StructField("sex", StringType, true) ::
            StructField("age", IntegerType, true) :: Nil
        )
        ////TODO 3:定义表的主键
        val kuduTablePrimaryKey = Seq("CompanyId")
        //TODO 4:定义分区的schema
        val kuduTableOptions = new CreateTableOptions()
        kuduTableOptions.
          setRangePartitionColumns(List("name").asJava).
          setNumReplicas(3)
        //TODO 5:调用create Table api
        kuduContext.createTable(
          kuduTableName,schema,kuduTablePrimaryKey, kuduTableOptions)
      }
    }

    定义表时要注意的一个项目是Kudu表选项值。您会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方法。这是因为在这里,我们调用了Kudu Java客户端本身,它需要Java对象(即java.util.List)而不是Scala的List对象;(要使“asJava”方法可用,请记住导入JavaConverters库。)

    创建表后,通过将浏览器指向http:// <master-hostname>:8051 / tables来查看Kudu主UI可以找到创建的表,通过单击表ID,能够看到表模式和分区信息。

    (点击Table id 可以观察到表的schema等信息)

    spark删除kudu表

    object DropTable {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("AcctfileProcess")
          //设置Master_IP并设置spark参数
          .setMaster("local")
          .set("spark.worker.timeout", "500")
          .set("spark.cores.max", "10")
          .set("spark.rpc.askTimeout", "600s")
          .set("spark.network.timeout", "600s")
          .set("spark.task.maxFailures", "1")
          .set("spark.speculationfalse", "false")
          .set("spark.driver.allowMultipleContexts", "true")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkContext = SparkContext.getOrCreate(sparkConf)
        val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
        //使用spark创建kudu表
        val kuduContext = new KuduContext("hadoop01:7051,hadoop02:7051,hadoop03:7051", sqlContext.sparkContext)
    
        // TODO 指定要删除的表名称
        var kuduTableName = "spark_kudu_tbl"
    
        // TODO 检查表如果存在,那么删除表
        if (kuduContext.tableExists(kuduTableName)) {
          kuduContext.deleteTable(kuduTableName)
        }
      }
    }
  • 相关阅读:
    【转】Fiddler 教程
    【转】java中三个类别加载器的关系以及各自加载的类的范围
    【转】HTTP协议详解
    【转】Google是如何做代码审查的?
    JSP页面的三种include方式
    forward和sendRedirect的差别
    cxf方式实现WebService的简单实例
    Pushlet简单入门实例
    Java学习(二十三):log4j日志打印
    PL/SQL相关问题解决办法汇总
  • 原文地址:https://www.cnblogs.com/niutao/p/10555247.html
Copyright © 2011-2022 走看看