zoukankan      html  css  js  c++  java
  • Spark SQL External Data Sources JDBC官方实现写测试

    通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。

    jdbc.scala重要API介绍:

    /**
     * Save this RDD to a JDBC database at `url` under the table name `table`.
     * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
     * If you pass `true` for `allowExisting`, it will drop any table with the
     * given name; if you pass `false`, it will throw if the table already
     * exists.
     */
    def createJDBCTable(url: String, table: String, allowExisting: Boolean) 
    
    
    /**
     * Save this RDD to a JDBC database at `url` under the table name `table`.
     * Assumes the table already exists and has a compatible schema.  If you
     * pass `true` for `overwrite`, it will `TRUNCATE` the table before
     * performing the `INSERT`s.
     *
     * The table must already exist on the database.  It must have a schema
     * that is compatible with the schema of this RDD; inserting the rows of
     * the RDD in order via the simple statement
     * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
     */
    def insertIntoJDBC(url: String, table: String, overwrite: Boolean) 
    import org.apache.spark.sql.SQLContext  
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    
    val sqlContext  = new SQLContext(sc)
    import sqlContext._
    
    #数据准备
    val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root"
    
    val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222))
    val arr1x2 = Array[Row](Row.apply("fred", 3))
    val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil)
    
    val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2))
    val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) 
    
    import org.apache.spark.sql.jdbc._
    
    ================================CREATE======================================
    val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
    
    srdd.createJDBCTable(url, "person", false)
    sqlContext.jdbcRDD(url, "person").collect.foreach(println)
    [dave,42]
    [mary,222]
    
    ==============================CREATE with overwrite========================================
    val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
    srdd.createJDBCTable(url, "person2", false)
    sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
    [mary,222,2]
    [dave,42,1]
    
    val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
    srdd2.createJDBCTable(url, "person2", true)
    sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
    [fred,3]
    
    ================================CREATE then INSERT to append======================================
    val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
    val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
    srdd.createJDBCTable(url, "person3", false)
    sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
    [mary,222]
    [dave,42]
    
    srdd2.insertIntoJDBC(url, "person3", false)
    sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
    [mary,222]
    [dave,42]
    [fred,3]
    
    ================================CREATE then INSERT to truncate======================================
    val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
    val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
    
    srdd.createJDBCTable(url, "person4", false)
    sqlContext.jdbcRDD(url, "person4").collect.foreach(println)
    [dave,42]
    [mary,222]
    
    srdd2.insertIntoJDBC(url, "person4", true)
    [fred,3]
    
    ================================Incompatible INSERT to append======================================
    val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
    val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
    srdd.createJDBCTable(url, "person5", false)
    srdd2.insertIntoJDBC(url, "person5", true)
        java.sql.SQLException: Column count doesn't match value count at row 1
  • 相关阅读:
    51degress.mobi与wurfl项目的对比
    低版本的51degrees.mobi 1.2.0.5 用UserAgent获取手机硬件型号,并升级最新的WURFL硬件包
    RedGate系列工具,开发必备
    VS中代码显示虚竖线,代码格式标记 Indent Guides
    asp.net下CKFinder IE9以下浏览器中上传图片文件时提示“无效文件名或文件夹名称”的解决方法
    让MySoft.Data也能有Discuz!NT的数据库查询分析工具
    恶意访问网站的策略处理,IP访问限制
    【转载】Asp.Net 全生命周期
    如何在解决方案中新建子网站,Discuz项目子网站技术
    博客园电子期刊2009年8月刊发布啦
  • 原文地址:https://www.cnblogs.com/luogankun/p/4275213.html
Copyright © 2011-2022 走看看