zoukankan      html  css  js  c++  java
  • Spark学习笔记4:数据读取与保存

    Spark对很多种文件格式的读取和保存方式都很简单。Spark会根据文件扩展名选择对应的处理方式。

    Spark支持的一些常见文件格式如下:

    •  文本文件

       使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件。也可以指定minPartitions控制分区数。传递目录作为参数,会把目录中的各部分都读取到RDD中。例如:

    val input = sc.textFile("E:\share\new\chapter5")
    input.foreach(println)
    

     chapter目录有三个txt文件,内容如下:

     

    输出结果:

    用SparkContext.wholeTextFiles()也可以处理多个文件,该方法返回一个pair RDD,其中键是输入文件的文件名。

    例如:

        val input = sc.wholeTextFiles("E:\share\new\chapter5")
        input.foreach(println)
    

      输出结果:

    保存文本文件用saveAsTextFile(outputFile)

    •  JSON

    JSON是一种使用较广的半结构化数据格式,这里使用json4s来解析JSON文件。

    如下:

    import org.apache.spark.{SparkConf, SparkContext}
    import org.json4s.ShortTypeHints
    import org.json4s.jackson.JsonMethods._
    import org.json4s.jackson.Serialization
    
    object TestJson {
    
      case class Person(name:String,age:Int)
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("JSON")
        val sc = new SparkContext(conf)
        implicit val formats = Serialization.formats(ShortTypeHints(List()))
        val input = sc.textFile("E:\share\new\test.json")
        input.collect().foreach(x => {var c = parse(x).extract[Person];println(c.name + "," + c.age)})
    
      }
    
    }
    

     json文件内容:

    输出结果:

    保存JSON文件用saveASTextFile(outputFile)即可

    如下:

    val datasave = input.map { myrecord =>
          implicit val formats = DefaultFormats
          val jsonObj = parse(myrecord)
          jsonObj.extract[Person]
        }
    datasave.saveAsTextFile("E:\share\spark\savejson")
    

    输出结果:

    • CSV文件

     读取CSV文件和读取JSON数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。

    如下:

    import org.apache.spark.{SparkConf, SparkContext}
    import java.io.StringReader
    
    import au.com.bytecode.opencsv.CSVReader
    
    object DataReadAndSave {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("CSV")
        val sc = new SparkContext(conf)
    
        val input = sc.textFile("E:\share\spark\test.csv")
        input.foreach(println)
        val result = input.map{
          line =>
            val reader = new CSVReader(new StringReader(line))
            reader.readNext()
        }
        for(res <- result){
          for(r <- res){
            println(r)
          }
        }
      }
    
    }
    

    test.csv内容:

    输出结果:

     保存csv

    如下:

    val inputRDD = sc.parallelize(List(Person("Mike", "yes")))
            inputRDD.map(person  => List(person.name,person.favoriteAnimal).toArray)
            .mapPartitions { people =>
              val stringWriter = new StringWriter()
              val csvWriter = new CSVWriter(stringWriter)
              csvWriter.writeAll(people.toList)
              Iterator(stringWriter.toString)
            }.saveAsTextFile("E:\share\spark\savecsv")
    
    • SequenceFile

    SequenceFile是由没有相对关系结构的键值对文件组成的常用Hadoop格式。是由实现Hadoop的Writable接口的元素组成,常见的数据类型以及它们对应的Writable类如下:

    读取SequenceFile

    调用sequenceFile(path , keyClass , valueClass , minPartitions)

    保存SequenceFile

    调用saveAsSequenceFile(outputFile)

    • 对象文件

    对象文件使用Java序列化写出,允许存储只包含值的RDD。对象文件通常用于Spark作业间的通信。

    保存对象文件调用 saveAsObjectFile    读取对象文件用SparkContext的objectFile()函数接受一个路径,返回对应的RDD

    • Hadoop输入输出格式

     Spark可以与任何Hadoop支持的格式交互。

    读取其他Hadoop输入格式,使用newAPIHadoopFile接收一个路径以及三个类,第一个类是格式类,代表输入格式,第二个类是键的类,最后一个类是值的类。

    hadoopFile()函数用于使用旧的API实现的Hadoop输入格式。

    KeyValueTextInputFormat 是最简单的 Hadoop 输入格式之一,可以用于从文本文件中读取键值对数据。每一行都会被独立处理,键和值之间用制表符隔开。

     例子:

    import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
    
    object HadoopFile {
    
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("hadoopfile").setMaster("local")
        val sc = new SparkContext(conf)
    
       
        val job = new Job()
        val data = sc.newAPIHadoopFile("E:\share\spark\test.json" ,
          classOf[KeyValueTextInputFormat],
          classOf[Text],
          classOf[Text],
          job.getConfiguration)
        data.foreach(println)
    
        data.saveAsNewAPIHadoopFile(
          "E:\share\spark\savehadoop",
          classOf[Text],
          classOf[Text],
          classOf[TextOutputFormat[Text,Text]],
          job.getConfiguration)
    
      }
    }
    

      输出结果:

    读取

    保存

    若使用旧API如下:

    val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("E:\share\spark\test.json
    ").map { case (x, y) => (x.toString, y.toString) } input.foreach(println)

      

    • 文件压缩

    对数据进行压缩可以节省存储空间和网络传输开销,Spark原生的输入方式(textFile和sequenFile)可以自动处理一些类型的压缩。在读取压缩后的数据时,一些压缩编解码器可以推测压缩类型。

    • 文件系统

    Spark支持读写很多种文件系统,可以使用任何我们想要的文件格式。包括:

      1、本地文件系统 

    要求文件在集群中所有节点的相同路径下都可以找到。 本地文件系统路径使用 例如:val rdd = sc.textFile("file:///home/holden/happypandas.gz")。

      2、Amazon S3

    将一个以s3n://开头的路径以s3n://bucket/path-within-bucket的形式传给Spark的输入方法。

      3、HDFS

    在Spark中使用HDFS只需要将输入路径输出路径指定为hdfs://master:port/path就可以了

    • Apache Hive

    Apache Hive是Hadoop上一中常见的结构化数据源。Hive可以在HDFS内或者在其他存储系统上存储多种格式的表。SparkSQL可以读取Hive支持的任何表。

    将Spark SQL连接到已有的Hive上,创建出HiveContext对象也就是Spark SQL入口,然后就可以使用Hive查询语言来对你的表进行查询,并以由行组成的RDD形式返回数据。

    使用HiveContext.jsonFile方法可以从整个文件中获取Row对象组成的RDD。例子:

    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Sparksql {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("SparkSQL")
            val sc = new SparkContext(conf)
            val sql = new HiveContext(sc)
            val input = sql.jsonFile("E:\share\spark\tweets.json")
            input.registerTempTable("tweets")
            val topTweets = sql.sql("select user.name,text from tweets")
            topTweets.foreach(println)
      }
    
    }
    

    使用数据:

    输出结果:

    • 数据库

     Spark可以从任何支持Java数据库连接(JDBC)的关系型数据库中读取数据,包括MySQL,Postgre等系统。

    Spark连接JDBC,通过创建SQLContext对象进行连接,设置连接参数,然后就可以使用sql语句进行查询,结果返回一个jdbcRDD。如下:

    首先在MySQL里面建立名为info的数据库,建表及导入数据:

    sql查询数据:

    使用Spark连接JDBC查询,Scala代码如下:

    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    object JDBC {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("SparkSQL")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val mysql = sqlContext.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/info").
          option("dbtable","student").option("driver","com.mysql.jdbc.Driver").
          option("user","root").option("password","********").load()
        mysql.registerTempTable("student")
        mysql.sqlContext.sql("select * from student where sage >= 20").collect().foreach(println)
      }
    
    }
    

      输出结果:

  • 相关阅读:
    一线架构师实践指南读后感
    可修改性战术
    软件架构师如何工作?
    寒假学习第十五天
    寒假学习第十四天
    寒假学习第十三天
    寒假学习第十二天
    寒假学习第十一天
    寒假学习第十天
    如何变得聪明
  • 原文地址:https://www.cnblogs.com/caiyisen/p/7527459.html
Copyright © 2011-2022 走看看