zoukankan      html  css  js  c++  java
  • Learning Spark中文版--第五章--加载保存数据(1)

      开发工程师和数据科学家都会受益于本章的部分内容。工程师可能希望探索更多的输出格式,看看有没有一些适合他们下游用户的格式。数据科学家可能会更关注他们已经使用的数据格式。

    Motivation

      我们已经介绍了大量分布式程序使用的Spark操作。目前为止,我们的例子都是从一个本地集合和规整文件中加载数据,但是有可能你的数据不是规整的或者不在一台机器上,那么就跟着我一起探索加载和保存数据的操作用法。

      Spark支持广泛的输出输入源,部分原因是因为Spark构建在Haddoop生态环境之上。Spark可以通过Hadoop中MapReduce的InputFormatOutPutForamt接口来访问数据,这些接口可用于很多常用的文件格式和存储系统(如,S3,HDFS,Cassadra,HBase等等)。第84页上的“Hadoop输入和输出格式”展示了如何直接使用这些格式。

      更常见的情况是,你会想使用对原始接口封装的更高级API。Spark和它的生态系统提供了很多使用方式,这一章,我们会详细介绍三个常用数据源集合:

    • 文件格式和文件系统

      对于数据存储在本地或分布式的文件系统,如NFS,HDFS,或亚马逊的S3,Spark可以访问各种文件格式,包括文本,JSON,SequnceFile和
      Protocol buffer。我们会介绍如何使用这几种常用文件格式,以及Spark如何与不同的文件系统对接和配置压缩。

    • 通过Spark SQL构建结构化数据源

      第九章介绍的Spark SQL模块,提供了非常高效的结构化数据源API,包括JSON和Apache的Hive。我们会简要介绍一些Spark SQL,但是大部分内容留在第九章

    • 数据库和键值对存储

      我们会简要了解如何连接Cassandra,HBase,Elasticsearch和JDBC数据库的内置或第三方库。

      Spark支持的语言可以使用我们选择的大多数方法,但有些库只有Java和Scala可以使用。我们遇到这样的例子会进行提示。

    文件格式

      对于大多数文件格式,Spark加载和保存的方法都非常简单。从非结构化的格式,如text,到半结构化数据,如JSON,再到结构化数据,如SequenceFile(见表5-1)。The input formats that Spark wraps all transparently handle compressed formats based on the file extension.

    (我翻不出来:(,对于输入格式,Spark会对其包装,包装了的输入格式会基于文件扩展名透明地处理压缩格式)。

    格式名 是否结构化 评价
    文本文件 普通文本文件。假定每行一条记录
    JSON 半结构 常见的基于文本格式,半结构;大多数库需要每行一条记录
    CSV 非常常用的基于文本的格式,通常用于电子表格应用
    SequnceFiles Hadoop中一种常用的键值对数据格式
    Protocol buffer 一种快速的,空间利用率高的多语言格式
    Object file Spark job用来共享代码的保存数据格式,非常有用

      除了Spark直接支持的输出机制之外,我们可以将Hadoop的新老文件API用于键值对数据。可以在键值对上使用这些API,因为Hadoop的接口要求键值对数据,即使是忽略键的数据。在忽略键的情况下,通常使用虚拟键(如null)。

    Text Files(文本文件)

      Spark中加载保存文本文件非常简单。当我们加载一个简单的文本文件作为RDD时,每一行输入变成了RDD的一个基本元素。我们可以把多个文本文件同时加载到一个键值对RDD中,键作为文件名,值作为文件内容。

    loading text files(加载文本文件)

      加载文本文件就和在SparkContext调用textFile(文件路径)一样简单,Example5-1到5-3展示了例子。如果我们想控制分区的数量我们也可以明确设定minPartitions

    Example 5-1. Loading a text file in Python
    
    input = sc.textFile("file:///home/holden/repos/spark/README.md")
    
    Example 5-2. Loading a text file in Scala
    
    val input = sc.textFile("file:///home/holden/repos/spark/README.md")
    
    Example 5-3. Loading a text file in Java
    
    JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")
    

      参数是目录形式的多文件输入可以使用两种方式处理。我们可以直接使用textFile方法并把目录地址作为参数传入,这就会把目录中所有文件传进我们的RDD。有时候需要知道某个文件的某一部分的来源(例如时间和文件的键结合作为区分标志)或者我们需要一次处理整个文件。如果我们的文件足够小,我们可以使用SparkContext.wholeTextFiles()方法,这会返回一个以输入文件的名称作为键的键值对RDD。

       当每个文件能表示一个确定的时间数据,wholeTextFiles()就会变得很有用。如果我们有能够表示不同时间段销售数据的文件,我们可以很轻易地计算出每个时间段的平均值。示例:

    Example 5-4. Average value per file in Scala
    
    val input = sc.wholeTextFiles("file://home/holden/salesFiles")
    val result = input.mapValues{y =>
    val nums = y.split(" ").map(x => x.toDouble)
    nums.sum / nums.size.toDouble
    }
    
    

    Spark支持读取指定目录的所有文件,并且也支持通配符输入(如,part-*.txt)。这个用处非常大,因为较大数据集通常分布在多个文件中,特别是如果其他文件(如成功标志)和要处理的文件在同一目录中。

    Saving text files(保存文本文件)

       输出文本文件也很简单。saveAsTextFile()方法,如Example5-5所示,以一个输出的路径作为参数,并将RDD内容输入到该路径之中。这个路径会作为一个目录并且Spark会把多个文件输入到该目录之下。Spark可以把输出结果写入多个节点。这个方法不能控制输出数据分段的开始和结束位置。但有其他的方法可以控制。

    Example 5-5. Saving as a text file in Python
    
    result.saveAsTextFile(outputFile)
    

    JSON

      JSON是一个很流行的半结构化数据格式。加载JSON数据最简单的方式就是把JSON当做文本文件然后用JSON解析器解析映射值。同样的,我们可以使用偏好的JSON序列化库把值写入字符串,然后再给写出。在Java和Scala中,我们可以使用定制的Hadoop格式来处理JSON。172也也介绍了Spark SQL怎么加载JSON数据。

    loading JSON(加载JSON)

      像文本文件一样加载然后转换JSON数据是Spark所有支持的语言都可以使用的一种方法。这是假定你的JSON数据每条记录都在一行之中,如果你的JSON数据是多行的,你可能必须加载整个文件然后转换每个文件。如果你使用的语言构建JSON解析器的代价比较大,你可以使用mapPartitions()来重用解析器;107页"Working on a Per-Partition Basis"有这方面的细节。
      我们常用的这三种语言有大量的JSON库可以使用,但为了简单起见,每种语言只介绍一种库。在Python中我们使用内置库(Example5-6),Java和Scala中我们使用Jackson(Examples5-7和5-8)。选择这几个库是因为他们运行效果很好并且使用相对简单。如果你在解析过程中花了大量时间,那可以看一下Java和Scala其他的JSON库。

    Example 5-6. Loading unstructured JSON in Python
    
    import json
    data = input.map(lambda x: json.loads(x))
    

    在Scala和Java中,通常把JSON数据转化为表示JSON格式的类。解析时,我们可能希望跳过无效的记录。下面展示了一个把JSON数据转换成Person实例的一个例子。

    Example 5-7. Loading JSON in Scala
    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.fasterxml.jackson.databind.DeserializationFeature
    ...
    case class Person(name: String, lovesPandas: Boolean) // Must be a top-level class
    ...
    // Parse it into a specific case class. We use flatMap to handle errors
    // by returning an empty list (None) if we encounter an issue and a
    // list with one element if everything is ok (Some(_)).
    //把JSON转化为样例类。使用flatMap转化,如果遇到错误就返回list(None),否则就返回一条记录的Some(_)
    val result = input.flatMap(record => {
        try {
            Some(mapper.readValue(record, classOf[Person]))
        } catch {
            case e: Exception => None
        }})
        
    Example 5-8. Loading JSON in Java
    
    class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
        public Iterable<Person> call(Iterator<String> lines) throws Exception {
            ArrayList<Person> people = new ArrayList<Person>();
            ObjectMapper mapper = new ObjectMapper();
            while (lines.hasNext()) {
                String line = lines.next();
                try {
                    people.add(mapper.readValue(line, Person.class));
                } catch (Exception e) {
                // skip records on failure
                }
            }
            return people;
        }
    }
    JavaRDD<String> input = sc.textFile("file.json");
    JavaRDD<Person> result = input.mapPartitions(new ParseJson());
    

    处理格式不正确的记录可能很麻烦,特别是半结构化的数据,如JSON。对于小数据集因格式不正确的输入导致程序崩溃还可以接受,但是,处理大数据集时遇到格式畸形输入也是家常便饭。如果选择跳过错误格式的数据,你可能会想使用累加器来记录并追踪错误的数量。

    Saving JSON(保存JSON)

      输出JSON比加载JSON简单很多,因为不用担心错误格式的数据,并且知道我们输出数据的类型。我们可以直接使用把字符串RDD转换成JSON的相同的库,把对象转换成JSON后再使用Spark文本文件API将其输出。

      假如我们正在举行一个对喜爱熊猫的人的促销。使用第一步中的输入,过滤出喜欢熊猫的人,示例如下:

    Example 5-9. Saving JSON in Python
    
    (data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x))
        .saveAsTextFile(outputFile))
    Example 5-10. Saving JSON in Scala
    
    result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
        .saveAsTextFile(outputFile)
    Example 5-11. Saving JSON in Java
    class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
        public Iterable<String> call(Iterator<Person> people) throws Exception {
            ArrayList<String> text = new ArrayList<String>();
            ObjectMapper mapper = new ObjectMapper();
            while (people.hasNext()) {
                Person person = people.next();
                text.add(mapper.writeValueAsString(person));
            }
            return text;
        }
    }
    JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(
        new LikesPandas());
    JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
    formatted.saveAsTextFile(outfile)
    

      使用已有的文本文件的机制再添加上需要的JSON库,在Spark中加载和保存JSON变得多么简单。

    Comma-Separated Values and Tab-Separated Values(逗号分割和Tab分割)

      逗号分割值文件要求每行字段数量固定,并且这些字段被逗号分割(TSV文件是根据tab键进行分割)。通常是每行储存一条记录,但是实际上并非总是如此,因为记录经常超过一行。CSV和TSV文件有时可能不一致,经常出现在处理换行符,转义,显示非ASCII字符,或非整数数字时。CSV天生不能处理嵌套类型字段,所以我们必须手动处理。

      不像JSON的字段,每条记录都没有字段名字与其关联,只取行号就行了。在CSV文件中有个不成文的规定,第一行的每一列的值就是每个字段的名称。

    Loading CSV(加载CSV)

      加载CSV/TSV数据和加载JSON很像,按照文本文件那样加载然后再进行处理。如果格式缺少标准化,会导致相同库的不同版本之间处理的方式不同。

      和JSON一样,有很多CSV的库可以使用,不过我们每个语言只介绍一种。在Python中我们使用csv库。在Scala和Java中我们使用opencsv。

    当然也有Hadoop的输入格式,CSVInputFormat,可以用来在Scala或Java中加载CSV数据,它也不支持换行符的记录。

      如果你的CSV数据的任何字段都不包含换行符,你可以直接使用textFile()加载你的数据并进行转换。示例如下:

    Example 5-12. Loading CSV with textFile() in Python
    
    import csv
    import StringIO
    ...
    def loadRecord(line):
        """Parse a CSV line"""
        input = StringIO.StringIO(line)
        reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
        return reader.next()
    input = sc.textFile(inputFile).map(loadRecord)
    
    Example 5-13. Loading CSV with textFile() in Scala
    
    import Java.io.StringReader
    import au.com.bytecode.opencsv.CSVReader
    ...
    val input = sc.textFile(inputFile)
    val result = input.map{ line =>
        val reader = new CSVReader(new StringReader(line));
        reader.readNext();
    }
    
    
    Example 5-14. Loading CSV with textFile() in Java
    import au.com.bytecode.opencsv.CSVReader;
    import Java.io.StringReader;
    ...
    public static class ParseLine implements Function<String, String[]> {
        public String[] call(String line) throws Exception {
            CSVReader reader = new CSVReader(new StringReader(line));
            return reader.readNext();
        }
    }
    JavaRDD<String> csvFile1 = sc.textFile(inputFile);
    JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());
    

      如果字段中内嵌了换行符,我们需要加载整个文件并且解析整段,就如Examples5-15到5-17中所展示。很不幸的是如果每个文件都非常大,那加载解析过程可能出现性能瓶颈。文本文件不同的加载方法在73页"Loading text files"有描述。

    Example 5-15. Loading CSV in full in Python
    
    def loadRecords(fileNameContents):
        """Load all the records in a given file"""
        input = StringIO.StringIO(fileNameContents[1])
        reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
        return reader
    fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
    
    Example 5-16. Loading CSV in full in Scala
    
    case class Person(name: String, favoriteAnimal: String)
    
    val input = sc.wholeTextFiles(inputFile)
    val result = input.flatMap{ case (_, txt) =>
        val reader = new CSVReader(new StringReader(txt));
        reader.readAll().map(x => Person(x(0), x(1)))
    }
    
    Example 5-17. Loading CSV in full in Java
    
    public static class ParseLine
        implements FlatMapFunction<Tuple2<String, String>, String[]> {
        public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
            CSVReader reader = new CSVReader(new StringReader(file._2()));
            return reader.readAll();
        }
    }
    JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);
    JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
    

    如果只有少量的输入文件,并且你需要使用wholeFile()方法,则可能需要重新分区输入使得Spark高效并行化执行你的后续操作。

    Saving CSV(保存CSV)

      就像JSON一样,输出CSV/TSV数据也很简单并且重用输出的编码对象也有很多优点。因为在CSV中我们不用输出每个记录的字段名,我们需要创建一个映射来确保输出的一致性。一个简单的方法就是写一个函数,把字段转换到数组的指定位置。在Python中,如果我们想输出字典,CSV writer按照我们提供的字段名顺序构建writer,然后将字典输出。

      我们使用的CSV库输出到文件或writer,所以我们可以使用StringWriter/StringIO来把结果输入到RDD之中,示例如下:

    Example 5-18. Writing CSV in Python
    
    def writeRecords(records):
        """Write out CSV lines"""
        output = StringIO.StringIO()
        writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
        for record in records:
            writer.writerow(record)
        return [output.getvalue()]
        
    pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
    
    Example 5-19. Writing CSV in Scala
    
    pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
    .mapPartitions{people =>
        val stringWriter = new StringWriter();
        val csvWriter = new CSVWriter(stringWriter);
        csvWriter.writeAll(people.toList)
        Iterator(stringWriter.toString)
    }.saveAsTextFile(outFile)
    

      你可能注意到了,例子中我们知道要输出内容的所有的字段。如果有些输入的字段是在运行时确定的,那我们只能换个方式了。最简单的办法就是先遍历所有的数据来提取不同的字段,字段确定之后就可以输出数据了。

  • 相关阅读:
    图片合成
    ASP.net常用对象之一(Request对象)
    vs2010新增功能
    ASP.NET MVC 入门5、View与ViewData【转】
    ASP.NET MVC 入门3、Routing【转】
    ASP.NET MVC 入门2、项目的目录结构与核心的DLL[转]
    ASP.NET MVC 入门4、Controller与Action【转】
    jquery相关文摘
    application技术整理
    vb datagrid中的欄目順序要與recordset的順序一致
  • 原文地址:https://www.cnblogs.com/krcys/p/8521048.html
Copyright © 2011-2022 走看看