zoukankan      html  css  js  c++  java
  • 使用Spark读写CSV格式文件(转)

    原文链接:使用Spark读写CSV格式文件

    CSV格式的文件也称为逗号分隔值(Comma-Separated Values,CSV,有时也称为字符分隔值,因为分隔字符也可以不是逗号。在本文中的CSV格式的数据就不是简单的逗号分割的),其文件以纯文本形式存表格数据(数字和文本)。CSV文件由任意数目的记录组成,记录间以某种换行符分隔;每条记录由字段组成,字段间的分隔符是其它字符或字符串,最常见的是逗号或制表符。通常,所有记录都有完全相同的字段序列。

      本篇文章将介绍如何使用Spark 1.3+的外部数据源接口来自定义CSV输入格式的文件解析器。这个外部数据源接口是由databricks公司开发并开源的(地址:https://github.com/databricks/spark-csv),通过这个类库我们可以在Spark SQL中解析并查询CSV中的数据。因为用到了Spark的外部数据源接口,所以我们需要在Spark 1.3+上面使用。在使用之前,我们需要引入以下的依赖:

    1 <dependency>
    2     <groupId>com.databricks</groupId>
    3     <artifactId>spark-csv_2.10</artifactId>
    4     <version>1.0.3</version>
    5 </dependency>

    目前spark-csv_2.10的最新版就是1.0.3。如果我们想在Spark shell里面使用,我们可以在--jars选项里面加入这个依赖,如下:

    1 [iteblog@spark $] bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
     

      和《Spark SQL整合PostgreSQL》文章中用到的load函数类似,在使用CSV类库的时候,我们需要在options中传入以下几个选项:

      1、path:看名字就知道,这个就是我们需要解析的CSV文件的路径,路径支持通配符;
      2、header:默认值是false。我们知道,CSV文件第一行一般是解释各个列的含义的名称,如果我们不需要加载这一行,我们可以将这个选项设置为true;
      3、delimiter:默认情况下,CSV是使用英文逗号分隔的,如果不是这个分隔,我们就可以设置这个选项。
      4、quote:默认情况下的引号是'"',我们可以通过设置这个选项来支持别的引号。
      5、mode:解析的模式。默认值是PERMISSIVE,支持的选项有
        (1)、PERMISSIVE:尝试解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.
        (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected
        (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line

    如何使用

    1、在Spark SQL中使用

      我们可以通过注册临时表,然后使用纯SQL方式去查询CSV文件:

    1 CREATE TABLE cars
    2 USING com.databricks.spark.csv
    3 OPTIONS (path "cars.csv", header "true")

    我们还可以在DDL中指定列的名字和类型,如下:

    1 CREATE TABLEcars (yearMade double, carMake string, carModel string, comments string, blank string)
    2 USING com.databricks.spark.csv
    3 OPTIONS (path "cars.csv", header "true")

    2、通过Scala方式

      推荐的方式是通过调用SQLContextload/save函数来加载CSV数据:

    1 import org.apache.spark.sql.SQLContext
    2  
    3 val sqlContext = new SQLContext(sc)
    4 val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv""header"-> "true"))
    5 df.select("year""model").save("newcars.csv""com.databricks.spark.csv")

    当然,我们还可以使用com.databricks.spark.csv._的隐式转换:

    1 import org.apache.spark.sql.SQLContext
    2 import com.databricks.spark.csv._
    3  
    4 val sqlContext = new SQLContext(sc)
    5  
    6 val cars = sqlContext.csvFile("cars.csv")
    7 cars.select("year""model").saveAsCsvFile("newcars.tsv")

    3、在Java中使用

    和在Scala中使用类似,我们也推荐调用SQLContext类中 load/save函数

    01 /**
    02  * User: 过往记忆
    03  * Date: 2015-06-01
    04  * Time: 下午23:26
    05  * bolg: http://www.iteblog.com
    06  * 本文地址:http://www.iteblog.com/archives/1380
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10  
    11 import org.apache.spark.sql.SQLContext
    12  
    13 SQLContext sqlContext = new SQLContext(sc);
    14  
    15 HashMap<String, String> options = new HashMap<String, String>();
    16 options.put("header""true");
    17 options.put("path""cars.csv");
    18  
    19 DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
    20 df.select("year""model").save("newcars.csv""com.databricks.spark.csv");

    在Java或者是Scala中,我们可以通过CsvParser里面的函数来读取CSV文件:

    1 import com.databricks.spark.csv.CsvParser;
    2 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    3  
    4 DataFrame cars = (new CsvParser()).withUseHeader(true).csvFile(sqlContext, "cars.csv");

    4、在Python中使用

    Python中,我们也可以使用SQLContext类中 load/save函数来读取和保存CSV文件:

    1 from pyspark.sql import SQLContext
    2 sqlContext = SQLContext(sc)
    3  
    4 df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
    5 df.select("year""model").save("newcars.csv""com.databricks.spark.csv")
  • 相关阅读:
    详细深入分析 Java ClassLoader 工作机制
    centos 文件系统权限
    leaflet 实现地图上标记的发散闪烁动画
    GEOJSON 的渲染实例
    【转】多用户同时登陆Windows远程桌面 | 最近升级了win10系统,以前一直用的RDPWrap-1.6版本无法使用,解决方案
    getopt 用法
    安装oracle报:oracle net configuration assistant失败
    【Delphi学习】ADOQuery 用法
    如何为DOS批处理%time%小时的值小于10的时候如何在这个值前加0?
    .bat 中显示出的时间格式问题,如2:36:00,如何让运行脚本显示为02:36:00
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/4961464.html
Copyright © 2011-2022 走看看