zoukankan      html  css  js  c++  java
  • Spark:DataFrame 写入文本文件

    将DataFrame写成文件方法有很多
    最简单的将DataFrame转换成RDD,通过saveASTextFile进行保存但是这个方法存在一些局限性:
    1.将DataFrame转换成RDD或导致数据结构的改变
    2.RDD的saveASTextFile如果文件存在则无法写入,也就意味着数据只能覆盖无法追加,对于有数据追加需求的人很不友好
    3.如果数据需要二次处理,RDD指定分隔符比较繁琐

    基于以上原因,在研读了Spark的官方文档后,决定采取DataFrame的自带方法 write 来实现。
    此处采用mysql的数据作为数据源,读取mysql的方法在 Spark:读取mysql数据作为DataFrame 有详细介绍。

    1.mysql的信息

    mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。

    1 //配置文件示例:
    2 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 
    3 #mysql数据库配置
    4 mysql.driver=com.mysql.jdbc.Driver
    5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
    6 mysql.username=user
    7 mysql.password=123456

     2.需要的jar依赖

    sbt版本,maven的对应修改即可

     1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
     2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
     3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
     4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
     5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
     6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
     7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
     8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
     9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
    10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

     3.完整实现代码

     1 import java.io.FileInputStream
     2 import java.util.Properties
     3 
     4 import org.apache.spark.sql.hive.HiveContext
     5 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
     6 import org.apache.spark.{SparkConf, SparkContext}
     7 
     8 /**
     9   * @author Administrator
    10   *         2018/10/16-14:35
    11   *
    12   */
    13 object TestSaveFile {
    14   var hdfsPath: String = ""
    15   var proPath: String = ""
    16   var DATE: String = ""
    17 
    18   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
    19   val sc: SparkContext = new SparkContext(sparkConf)
    20   val sqlContext: SQLContext = new HiveContext(sc)
    21 
    22   def main(args: Array[String]): Unit = {
    23     hdfsPath = args(0)
    24     proPath = args(1)
    25     //不过滤读取
    26     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
    27     saveAsFileAbsPath(dim_sys_city_dict, hdfsPath + "TestSaveFile", "|", SaveMode.Overwrite)
    28   }
    29 
    30   /**
    31     * 获取 Mysql 表的数据
    32     *
    33     * @param sqlContext
    34     * @param tableName 读取Mysql表的名字
    35     * @param proPath   配置文件的路径
    36     * @return 返回 Mysql 表的 DataFrame
    37     */
    38   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String): DataFrame = {
    39     val properties: Properties = getProPerties(proPath)
    40     sqlContext
    41       .read
    42       .format("jdbc")
    43       .option("url", properties.getProperty("mysql.url"))
    44       .option("driver", properties.getProperty("mysql.driver"))
    45       .option("user", properties.getProperty("mysql.username"))
    46       .option("password", properties.getProperty("mysql.password"))
    47       .option("dbtable", tableName)
    48       .load()
    49   }
    50 
    51   /**
    52     * 将 DataFrame 保存为 hdfs 文件 同时指定保存绝对路径 与 分隔符
    53     *
    54     * @param dataFrame  需要保存的 DataFrame
    55     * @param absSaveDir 保存保存的路径 (据对路径)
    56     * @param splitRex   指定分割分隔符
    57     * @param saveMode   保存的模式:Append、Overwrite、ErrorIfExists、Ignore
    58     */
    59   def saveAsFileAbsPath(dataFrame: DataFrame, absSaveDir: String, splitRex: String, saveMode: SaveMode): Unit = {
    60     dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("mapred.output.compress", "false")
    61     //为了方便观看结果去掉压缩格式
    62     val allClumnName: String = dataFrame.columns.mkString(",")
    63     val result: DataFrame = dataFrame.selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")
    64     result.write.mode(saveMode).text(absSaveDir)
    65   }
    66 
    67   /**
    68     * 获取配置文件
    69     *
    70     * @param proPath
    71     * @return
    72     */
    73   def getProPerties(proPath: String): Properties = {
    74     val properties: Properties = new Properties()
    75     properties.load(new FileInputStream(proPath))
    76     properties
    77   }
    78 }

     4.测试

    1 def main(args: Array[String]): Unit = {
    2     hdfsPath = args(0)
    3     proPath = args(1)
    4     //不过滤读取
    5     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
    6     saveAsFileAbsPath(dim_sys_city_dict, hdfsPath + "TestSaveFile", "|", SaveMode.Overwrite)
    7   }

    5.执行命令

     1 nohup spark-submit --master yarn \
     2 --driver-memory 4G \
     3 --num-executors 2 \
     4 --executor-cores 4 \
     5 --executor-memory 8G \
     6 --class com.iptv.job.basedata.TestSaveFile \
     7 --jars /var/lib/hadoop-hdfs/tmp_lillcol/mysql-connector-java-5.1.38.jar \
     8 test.jar \
     9 hdfs://ns1/user/hive/../ \
    10 /var/.../job.properties > ./TestSaveFile.log 2>&1 &

     6.运行结果

     1 [hdfs@iptve4e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/TestSaveFile
     2 0      0      hdfs://ns1/user/hive/warehouse/TestSaveFile/_SUCCESS
     3 4.1 K  4.1 K  hdfs://ns1/user/hive/warehouse/TestSaveFile/part-r-123412340-ec83e1f1-4bd9-4b4a-89a3-8489c1f908dc
     4 
     5 [hdfs@iptve4e03 tmp_lillcol]$ hadoop fs -cat hdfs://ns1/user/hive/warehouse/TestSaveFile/part-r-123412340-ec83e1f1-4bd9-4b4a-89a3-8489c1f908dc
     6 1234|12349|张三|韩服_G|11234|张三艾欧尼亚|韩服-G|1234D5A3434|3|张三天庭
     7 12343|1234|1234|韩服_M|31234|李四艾欧尼亚|韩服-M|5F4EE4345|8|1234天庭
     8 1234|12340|石中剑山|韩服_s8|11234|张三艾欧尼亚|韩服-s8|59B403434|5|石中剑山天庭
     9 12344|12344|灵山|韩服_J|31234|李四艾欧尼亚|韩服-J|CF19F434B|40|灵山天庭
    10 1234|1234|他家|韩服_H|11234|张三艾欧尼亚|韩服-Z|51234EB1434|9|他家天庭
    11 12345|12340|云浮|韩服_F|31234|李四艾欧尼亚|韩服-Y|9C9C04344|41|浮天庭
    12 1234|12348|潮边疆|韩服_Z|41234|佛山艾欧尼亚|韩服-Z|5B034340F|15|边疆天庭
    13 12340|12344|河姆渡人源|韩服_HY|41234|深圳艾欧尼亚|韩服-HY434123490808|18|河姆渡人源天庭
    14 1234|1234|佛山|韩服_S|41234|佛山艾欧尼亚|韩服-FS|EEA981434|4|佛祖天庭
    15 12340|12343|揭阳|韩服_J|41234|深圳艾欧尼亚|韩服-JY|9FF084349|10|天庭
    16 1234|1234|石中剑边疆|韩服_|41234|佛山艾欧尼亚|韩服-HZ|440A434FC|0|石中剑边疆天庭
    17 12348|1234|梅边疆|韩服_Z|41234|深圳艾欧尼亚|韩服-MZ|E9B434F09|14|梅边疆天庭
    18 1234|12348|石中剑名|韩服_M|41234|佛山艾欧尼亚|韩服-MM|5D0A94434|14|石中剑名天庭
    19 12349|1234|日本|韩服_|41234|深圳艾欧尼亚|韩服-SG|BD0F34349|19|日本天庭
    20 1234|1234|石中剑石中剑|韩服_ST|41234|佛山艾欧尼亚|韩服-ST|18D0D0434|0|石中剑石中剑天庭
    21 12340|1234|深圳|韩服_Z|41234|深圳艾欧尼亚|韩服-Z|31E4C4344|4|深天庭
    22 12340|12340|石中剑尾|韩服_SW|41234|佛山艾欧尼亚|韩服-SW|1BA1234434B|10|石中剑尾天庭
    23 12341|1234|美国|韩服_Z|41234|深圳艾欧尼亚|韩服-Q|3C09D434B|13|美国天庭
    24 12341|1234|湛江|韩服_Z|41234|佛山艾欧尼亚|韩服-Z|3A49A4340|11|我家天庭
    25 1234|12343|清诗和远方|韩服_Y|11234|张三艾欧尼亚|韩服-Y|4344E0F31|10|清诗和远方天庭
    26 1234|41234|李四|韩服_AZ|31234|李四艾欧尼亚|韩服-Z|13F1D4344|1|李四天庭

     7.总结

    在整个过程中有几个需要注意的点

    • 只能存一个列
     1 /**
     2    * Saves the content of the [[DataFrame]] in a text file at the specified path.
     3    * The DataFrame must have only one column that is of string type.
     4    * Each row becomes a new line in the output file. For example:
     5    * {{{
     6    *   // Scala:
     7    *   df.write.text("/path/to/output")
     8    *
     9    *   // Java:
    10    *   df.write().text("/path/to/output")
    11    * }}}
    12    *
    13    * @since 1.6.0
    14    */
    15   def text(path: String): Unit = format("text").save(path)

    这段代码已经说明了一切,是的,只能保存只有一列的DataFrame.

    但是比起RDD,DataFrame能够比较轻易的处理这种情况

    1 def saveAsFileAbsPath(dataFrame: DataFrame, absSaveDir: String, splitRex: String, saveMode: SaveMode): Unit = {
    2     dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("mapred.output.compress", "false")
    3     //为了方便观看结果去掉压缩格式
    4     val allClumnName: String = dataFrame.columns.mkString(",")
    5     val result: DataFrame = dataFrame.selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")
    6     result.write.mode(saveMode).text(absSaveDir)
    7   }

    上述代码中 我们通过columns.mkString(",")获取 dataFrame 的所有列名并用","分隔,然后通过selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")将所有数据拼接当成一列,完美解决只能保存一列的问题

    • DataFrame 某个字段为空

    如果 DataFrame 中某个字段为null,那么在你最中生成的文件中不会有该字段,所以,如果对结果字段的个数有要求的,最好在数据处理的时候将有可能为null的数据赋值空串"",特别是还有将数据load进Hive需求的,否则数据会出现错位

    至此DataFrame 写文件功能实现

    此文为本人工作总结,转载请标明出处!!!!!!!

  • 相关阅读:
    pat甲级 1155 Heap Paths (30 分)
    pat甲级 1152 Google Recruitment (20 分)
    蓝桥杯 基础练习 特殊回文数
    蓝桥杯 基础练习 十进制转十六进制
    蓝桥杯 基础练习 十六进制转十进制
    蓝桥杯 基础练习 十六进制转八进制
    51nod 1347 旋转字符串
    蓝桥杯 入门训练 圆的面积
    蓝桥杯 入门训练 Fibonacci数列
    链表相关
  • 原文地址:https://www.cnblogs.com/lillcol/p/9798612.html
Copyright © 2011-2022 走看看