zoukankan      html  css  js  c++  java
  • spark操作kudu之DML操作

    Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成

    包括:

    • INSERT - 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述的INSERT_IGNORE。

    • INSERT-IGNORE - 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。

    • DELETE - 从Kudu表中删除DataFrame中的行

    • UPSERT - 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。

    • UPDATE - 更新dataframe中的行

    Insert操作

    import org.apache.kudu.spark.kudu.KuduContext
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    import org.apache.kudu.spark.kudu._
    /**
      * Created by angel;
      */
    object Insert {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("AcctfileProcess")
          //设置Master_IP并设置spark参数
          .setMaster("local")
          .set("spark.worker.timeout", "500")
          .set("spark.cores.max", "10")
          .set("spark.rpc.askTimeout", "600s")
          .set("spark.network.timeout", "600s")
          .set("spark.task.maxFailures", "1")
          .set("spark.speculationfalse", "false")
          .set("spark.driver.allowMultipleContexts", "true")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkContext = SparkContext.getOrCreate(sparkConf)
        val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
        //使用spark创建kudu表
        val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
        val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
        //TODO 1:定义kudu表
        val kuduTableName = "spark_kudu_tbl"
    
        //TODO 2:配置kudu参数
        val kuduOptions: Map[String, String] = Map(
          "kudu.table"  -> kuduTableName,
          "kudu.master" -> kuduMasters)
        import sqlContext.implicits._
        //TODO 3:定义数据
        val customers = Array(
          Customer("jane", 30, "new york"),
          Customer("jordan", 18, "toronto"))
    
        //TODO 4:创建RDD
        val customersRDD = sparkContext.parallelize(customers)
        //TODO 5:将RDD转成dataFrame
        val customersDF = customersRDD.toDF()
    
        //TODO 6:将数据插入kudu表
        kuduContext.insertRows(customersDF, kuduTableName)
    
        //TODO 7:将插入的数据读取出来
        sqlContext.read.options(kuduOptions).kudu.show
      }
    }

    Delete操作

    import org.apache.kudu.spark.kudu._
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    
    /**
      * Created by angel;
      */
    object Delete {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("AcctfileProcess")
          //设置Master_IP并设置spark参数
          .setMaster("local")
          .set("spark.worker.timeout", "500")
          .set("spark.cores.max", "10")
          .set("spark.rpc.askTimeout", "600s")
          .set("spark.network.timeout", "600s")
          .set("spark.task.maxFailures", "1")
          .set("spark.speculationfalse", "false")
          .set("spark.driver.allowMultipleContexts", "true")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkContext = SparkContext.getOrCreate(sparkConf)
        val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
        //使用spark创建kudu表
        val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
        val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
        //TODO 1:定义kudu表
        val kuduTableName = "spark_kudu_tbl"
    
        //TODO 2:配置kudu参数
        val kuduOptions: Map[String, String] = Map(
          "kudu.table"  -> kuduTableName,
          "kudu.master" -> kuduMasters)
        import sqlContext.implicits._
        //TODO 3:定义数据
        val customers = Array(
          Customer("jane", 30, "new york"),
          Customer("jordan", 18, "toronto"))
    
        //TODO 4:创建RDD
        val customersRDD = sparkContext.parallelize(customers)
        //TODO 5:将RDD转成dataFrame
        val customersDF = customersRDD.toDF()
        //TODO 6:注册表
        customersDF.registerTempTable("customers")
    
        //TODO 7:编写SQL语句,过滤出想要的数据
        val deleteKeysDF = sqlContext.sql("select name from customers where age > 20")
    
        //TODO 8:使用kuduContext执行删除操作
        kuduContext.deleteRows(deleteKeysDF, kuduTableName)
    
        //TODO 9:查看kudu表中的数据
        sqlContext.read.options(kuduOptions).kudu.show
      }
    }

    Upsert操作

    如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。

    import org.apache.kudu.spark.kudu._
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    
    /**
      * Created by angel;
      */
    object Upsert {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("AcctfileProcess")
          //设置Master_IP并设置spark参数
          .setMaster("local")
          .set("spark.worker.timeout", "500")
          .set("spark.cores.max", "10")
          .set("spark.rpc.askTimeout", "600s")
          .set("spark.network.timeout", "600s")
          .set("spark.task.maxFailures", "1")
          .set("spark.speculationfalse", "false")
          .set("spark.driver.allowMultipleContexts", "true")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkContext = SparkContext.getOrCreate(sparkConf)
        val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
        //使用spark创建kudu表
        val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
        val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
        //TODO 1:定义kudu表
        val kuduTableName = "spark_kudu_tbl"
    
        //TODO 2:配置kudu参数
        val kuduOptions: Map[String, String] = Map(
          "kudu.table"  -> kuduTableName,
          "kudu.master" -> kuduMasters)
        import sqlContext.implicits._
    
        //TODO 3:定义数据集
        val newAndChangedCustomers = Array(
          Customer("michael", 25, "chicago"),
          Customer("denise" , 43, "winnipeg"),
          Customer("jordan" , 19, "toronto"))
    
        //TODO 4:将数据集转换成dataframe
        val newAndChangedRDD = sparkContext.parallelize(newAndChangedCustomers)
        val newAndChangedDF  = newAndChangedRDD.toDF()
    
        //TODO 5:使用upsert来更新数据集
        kuduContext.upsertRows(newAndChangedDF, kuduTableName)
    
        //TODO 6:读取kudu中的数据
        sqlContext.read.options(kuduOptions).kudu.show
      }
    }

    Update操作

    更新kudu行数据

    import org.apache.kudu.spark.kudu._
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    
    /**
      * Created by angel;
      */
    object Update {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("AcctfileProcess")
          //设置Master_IP并设置spark参数
          .setMaster("local")
          .set("spark.worker.timeout", "500")
          .set("spark.cores.max", "10")
          .set("spark.rpc.askTimeout", "600s")
          .set("spark.network.timeout", "600s")
          .set("spark.task.maxFailures", "1")
          .set("spark.speculationfalse", "false")
          .set("spark.driver.allowMultipleContexts", "true")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkContext = SparkContext.getOrCreate(sparkConf)
        val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
        //使用spark创建kudu表
        val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
        val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
        //TODO 1:定义kudu表
        val kuduTableName = "spark_kudu_tbl"
    
        //TODO 2:配置kudu参数
        val kuduOptions: Map[String, String] = Map(
          "kudu.table"  -> kuduTableName,
          "kudu.master" -> kuduMasters)
    
        //TODO 3:准备数据集
        val modifiedCustomers = Array(Customer("michael", 25, "toronto"))
        val modifiedCustomersRDD = sparkContext.parallelize(modifiedCustomers)
        //TODO 4:将数据集转化成dataframe
        import sqlContext.implicits._
        val modifiedCustomersDF  = modifiedCustomersRDD.toDF()
    
        //TODO 5:执行更新操作
        kuduContext.updateRows(modifiedCustomersDF, kuduTableName)
    
        //TODO 6:查看kudu数据
        sqlContext.read.options(kuduOptions).kudu.show
      }
    }
  • 相关阅读:
    BeautifulSoup中的select方法
    BeautifulSoup中的find,find_all
    python中sys.stdout、sys.stdin
    python sort、sorted
    Numpy常用操作
    pandas (loc、iloc、ix)的区别
    小波变化库——Pywalvets学习笔记
    python filter()函数
    Redis数据库总结
    高逼格企业级MySQL数据库备份方案,原来是这样....
  • 原文地址:https://www.cnblogs.com/niutao/p/10555302.html
Copyright © 2011-2022 走看看