zoukankan      html  css  js  c++  java
  • Spark通过修改DataFrame的schema给表字段添加注释(转载)

    转载自:https://www.jianshu.com/p/e4c90dc08935

    1、需求背景

    通过Spark将关系型数据库(以Oracle为例)的表同步的Hive表,要求用Spark建表,有字段注释的也要加上注释。Spark建表,有两种方法:

    • 用Spark Sql,在程序里组建表语句,然后用Spark.sql("建表语句")建表,这种方法麻烦的地方在于你要读取Oracle表的详细的表结构信息,且要进行Oracle和Hive的字段类型进行一一对应
    • 用DataFrame 的saveAsTable方法,这种方法如果对应的数据库里没有表,则Spark会根据DataFrame的schema自动建表,比较简单,不用考虑字段类型匹配转化问题,但是这种方法有一个问题,Spark读取Oracle的表为DataFrame时,并不能将表字段的注释读进来,所以就有了如标题所示的需求。(一开始以为DataFrame不能加注释,经过研究,发现是可以的!)

    2、如何查看DataFrame是否有注释

    前面讲到DataFrame里没有Oracle的注释信息,但是如果数据源为Hive的话,是可以将注释获取到的。

    2.1 新建Hive测试表(带注释)

    create table `test` (
    `id` string comment 'ID', 
    `Name` string comment '名字'
    )
    comment '测试';

    2.2 Spark读取hive表并打印注释(在spark-shell里执行)

    首先看一下df.printSchema里并没有注释信息

    sql("use test")
    val df = spark.table("test")
    df.printSchema
    root
     |-- id: string (nullable = true)
     |-- name: string (nullable = true)

    用下面这行代码便可以打印注释信息:

    df.schema.foreach(s=>println(s.name,s.metadata))
    (id,{"comment":"ID","HIVE_TYPE_STRING":"string"})
    (name,{"comment":"名字","HIVE_TYPE_STRING":"string"})

    3、读取Oracle表并打印DataFrmae的元数据信息

    3.1 新建Oracle测试表(带注释)

    CREATE TABLE ORA_TEST (
    ID VARCHAR2(100), 
    NAME VARCHAR2(100)
    );
    COMMENT ON COLUMN ORA_TEST.ID IS 'ID';
    COMMENT ON COLUMN ORA_TEST.NAME IS '名字';
    COMMENT ON TABLE ORA_TEST IS  '测试';
    • 注:上面的注释语句和建表语句需要分开执行,或者也可以在数据库工具执行脚本,比如我用的DBeaver用快捷键Alt+x即可。当然也可以在工具的界面直接建表均可。

    3.2 读取Oracle表,并打印元数据

    代码:

    package com.dkl.leanring.spark.sql.Oracle
    
    import org.apache.spark.sql.SparkSession
    
    object OracleSchemaDemo {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("OracleSchemaDemo").master("local").getOrCreate()
        val df = spark.read
          .format("jdbc")
          .option("url", "jdbc:oracle:thin:@192.168.44.128:1521:orcl")
          .option("dbtable", "ORA_TEST")
          .option("user", "bigdata")
          .option("password", "bigdata")
          .option("driver", "oracle.jdbc.driver.OracleDriver")
          .load()
        df.schema.foreach(s => println(s.name, s.metadata))
    
        spark.stop
    
      }
    }
    (ID,{"name":"ID","scale":0})
    (NAME,{"name":"NAME","scale":0})

    注:Spark2.3.0和Spark2.2.1的元数据不太一样,上面的结果是Spark2.2.1(也是我写博客测试用的),项目中用的Spark2.3.0,2.3.0的元数据是空的,如下

    (ID,{})
    (NAME,{})

    可见并没有注释信息

    3.3 给DataFrame添加注释

    import org.apache.spark.sql.types._
    val commentMap = Map("ID" -> "ID", "NAME" -> "名字")
    
    val schema = df.schema.map(s => {
      s.withComment(commentMap(s.name))
    })
    
    //根据添加了注释的schema,新建DataFrame
    val new_df = spark.createDataFrame(df.rdd, StructType(schema)).repartition(160)
    
    new_df.schema.foreach(s => println(s.name, s.metadata))
    (ID,{"comment":"ID","name":"ID","scale":0})
    (NAME,{"comment":"名字","name":"NAME","scale":0})

    4、 测试写到Hive表有没有注释

    需将前面代码中的spark改为支持hive,即加上enableHiveSupport()

    spark.sql("use test")
    new_df.write.mode("overwrite").saveAsTable("ORA_TEST")

    然后在hive里看一下,是否有注释

    可以看到,成功的把注释也保存到里hive里

    5、附录

    附上在Eclipse运行的完整代码

    package com.dkl.leanring.spark.sql.Oracle
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object OracleSchemaDemo {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("OracleSchemaDemo").master("local").enableHiveSupport().getOrCreate()
        val df = spark.read
          .format("jdbc")
          .option("url", "jdbc:oracle:thin:@192.168.44.128:1521:orcl")
          .option("dbtable", "ORA_TEST")
          .option("user", "bigdata")
          .option("password", "bigdata")
          .option("driver", "oracle.jdbc.driver.OracleDriver")
          .load()
        df.schema.foreach(s => println(s.name, s.metadata))
    
        val commentMap = Map("ID" -> "ID", "NAME" -> "名字")
    
        val schema = df.schema.map(s => {
          s.withComment(commentMap(s.name))
        })
    
        //根据添加了注释的schema,新建DataFrame
        val new_df = spark.createDataFrame(df.rdd, StructType(schema)).repartition(160)
    
        new_df.schema.foreach(s => println(s.name, s.metadata))
    
        spark.sql("use test")
        //保存到hive
        new_df.write.mode("overwrite").saveAsTable("ORA_TEST")
    
        spark.stop
    
      }
    }
  • 相关阅读:
    The test form is only available for requests from the local machine
    64位Win7下,先安装Visual Studio,后安装IIS的设置步骤
    [转] 如何在 64 位的 Windows 7 中安裝 PLSQL DEVELOPER 8 和 Oracle 11g x64 Client
    excel对csv的转义
    js中没有引用的匿名函数调用方法
    缓存实现条件
    js对象成员的删除特性 (delete)
    js语法作用域之间的相关性
    【转】UBOOT之四:uboot.lds分析
    linux C 中的volatile使用
  • 原文地址:https://www.cnblogs.com/itboys/p/10940004.html
Copyright © 2011-2022 走看看