Spark Core DataSource:
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
The Structure of the Data Sources API
Read API Structure
The core structure for reading data is as follows:DataFrameReader.format(...).option("key", "value").schema(...).load()
format is optional because by default Spark will use the Parquet format. option allows you to set key-value configurations to parameterize how you will read data. Lastly, schema is optional if the data source provides a schema or if you intend to use schema inference. Naturally, there are some required options for each format, which we will discuss when we look at each format.
Basics of Reading Data
The foundation for reading data in Spark is the DataFrameReader. We access this through the SparkSession via the read attribute:spark.read
After we have a DataFrame reader, we specify several values:
- The format
- The schema
- The read mode
- A series of options
The format, options, and schema each return a DataFrameReader that can undergo further transformations and are all optional, except for one option. Each data source has a specific set of options that determine how the data is read into Spark (we cover these options shortly). At a minimum, you must supply the DataFrameReader a path to from which to read.
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
There are a variety of ways in which you can set options; for example, you can build a map and pass in your configurations. For now, we’ll stick to the simple and explicit way that you just saw.
Read modes
Read modes specify what will happen when Spark does come across malformed records.
Read mode | Description |
---|---|
permissive | Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column called _corrupt_record |
dropMalformed | Drops the row that contains malformed records |
failFast | Fails immediately upon encountering malformed records |
Write API Structure
The core structure for writing data is as follows:DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
format is optional because by default, Spark will use the arquet format. option, again, allows us to configure how to write out our given data. PartitionBy, bucketBy, and sortBy work only for file-based data sources; you can use them to control the specific layout of files at the destination.
Basics of Writing Data
The foundation for writing data is quite similar to that of reading data. Instead of the DataFrameReader, we have the DataFrameWriter. Because we always need to write out some given data source, we access the DataFrameWriter on a per-DataFrame basis via the write attribute:dataFrame.write
After we have a DataFrameWriter, we specify three values: the format, a series of options, and the save mode. At a minimum, you must supply a path.
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
Save modes
Save modes specify what will happen if Spark finds data at the specified location (assuming all else equal).
Save mode | Description |
---|---|
append | Appends the output files to the list of files that already exist at that location |
overwrite | Will completely overwrite any data that already exists there |
errorIfExists | Throws an error and fails the write if data or files already exist at the specified location |
ignore | If data or files exist at the location, do nothing with the current DataFrame |
The default is errorIfExists. This means that if Spark finds data at the location to which you’re writing, it will fail the write immediately. |
CSV Files
CSV Options
CSV Options表
Reading CSV Files
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("data/flight-data/csv/2010-summary.csv")
.show(5)
Things get tricky when we don’t expect our data to be in a certain format, but it comes in that way, anyhow. For example, let’s take our current schema and change all column types to LongType. This does not match the actual schema, but Spark has no problem with us doing this. The problem will only manifest itself when Spark actually reads the data. As soon as we start our Spark job, it will immediately fail (after we execute a job) due to the data not conforming to the specified schema:
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", LongType, true),
new StructField("ORIGIN_COUNTRY_NAME", LongType, true),
new StructField("count", LongType, false) ))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("data/flight-data/csv/2010-summary.csv")
.take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
In general, Spark will fail only at job execution time rather than DataFrame definition time—even if, for example, we point to a file that does not exist. This is due to lazy evaluation
Writing CSV Files
val csvFile = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
.load("data/flight-data/csv/2010-summary.csv")
csvFile.write.format("csv").mode("overwrite").option("sep", " ")
.save("my-tsv-file.tsv")
When you list the destination directory, you can see that my-tsv-file is actually a folder with numerous files within it:
$ ls /tmp/my-tsv-file.tsv/
/tmp/my-tsv-file.tsv/part-00000-35cf9453-1943-4a8c-9c82-9f6ea9742b29.csv
This actually reflects the number of partitions in our DataFrame at the time we write it out. If we were to repartition our data before then, we would end up with a different number of files.
JSON Files
In Spark, when we refer to JSON files, we refer to line-delimited JSON files. This contrasts with files that have a large JSON object or array per file.The line-delimited versus multiline trade-off is controlled by a single option: multiLine. When you set this option to true, you can read an entire file as one json object and Spark will go through the work of parsing that into a DataFrame. Line-delimited JSON is actually a much more stable format because it allows you to append to a file with a new record (rather than having to read in an entire file and then write it out), which is what we recommend that you use. Another key reason for the popularity of line-delimited JSON is because JSON objects have structure, and JavaScript (on which JSON is based) has at least basic types. This makes it easier to work with because Spark can make more assumptions on our behalf about the data. You’ll notice that there are significantly less options than we saw for CSV because of the objects.
JSON Options
JSON Options 表格
Reading JSON Files
spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
.load("data/flight-data/json/2010-summary.json").show(5)
Writing JSON Files
This, too, follows the rules that we specified before: one file per partition will be written out, and the entire DataFrame will be written out as a folder. It will also have one JSON object per line:
// in Scala
csvFile.write.format("json").mode("overwrite").save("my-json-file.json")
Parquet Files
Parquet is an open source column-oriented data store that provides a variety of storage optimizations, especially for analytics workloads. It provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files. It is a file format that works exceptionally well with Apache Spark and is in fact the default file format. We recommend writing data out to Parquet for long-term storage because reading from a Parquet file will always be more efficient than JSON or CSV. Another advantage of Parquet is that it supports complex types. This means that if your column is an array (which would fail with a CSV file, for example), map, or struct, you’ll still be able to read and write that file without issue. Here’s how to specify Parquet as the read format:spark.read.format("parquet")
Reading Parquet Files
We can set the schema if we have strict requirements for what our DataFrame should look like. Oftentimes this is not necessary because we can use schema on read, which is similar to the inferSchema with CSV files. However, with Parquet files, this method is more powerful because the schema is built into the file itself (so no inference needed).
// in Scala
spark.read.format("parquet")
.load("data/flight-data/parquet/2010-summary.parquet").show(5)
Parquet options
Parquet Option 表格
警告: Even though there are only two options, you can still encounter problems if you’re working with incompatible Parquet files. Be careful when you write out Parquet files with different versions of Spark (especially older ones) because this can cause significant headache.
Writing Parquet Files
csvFile.write.format("parquet").mode("overwrite")
.save("my-parquet-file.parquet")
ORC Files
ORC is a self-describing, type-aware columnar file format designed for Hadoop workloads. It is optimized for large streaming reads, but with integrated support for finding required rows quickly. ORC actually has no options for reading in data because Spark understands the file format quite well. An often-asked question is: What is the difference between ORC and Parquet? For the most part, they’re quite similar; the fundamental difference is that Parquet is further optimized for use with Spark, whereas ORC is further optimized for Hive.
Reading Orc Files
// in Scala
spark.read.format("orc").load("data/flight-data/orc/2010-summary.orc").show(5)
Writing Orc Files
// in Scala
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")
SQL Databases
To read and write from these databases, you need to do two things: include the Java Database Connectivity (JDBC) driver for you particular database on the spark classpath, and provide the proper JAR for the driver itself. For example, to be able to read and write from PostgreSQL, you might run something like this:
./bin/spark-shell
--driver-class-path postgresql-9.4.1207.jar
--jars postgresql-9.4.1207.jar
JDBC databases Options 表格
Reading from SQL Databases
// in Scala
val driver = "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
After you have defined the connection properties, you can test your connection to the database itself to ensure that it is functional. This is an excellent troubleshooting technique to confirm that your database is available to (at the very least) the Spark driver.
import java.sql.DriverManager
val connection = DriverManager.getConnection(url)
connection.isClosed()
connection.close()
// in Scala
val dbDataFrame = spark.read.format("jdbc").option("url", url).option("dbtable", tablename).option("driver", driver).load()
其他数据库
// in Scala
val pgDF = spark.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://database_server")
.option("dbtable", "schema.tablename")
.option("user", "username").option("password","my-secret-password").load()
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().show(5)
Query Pushdown
First, Spark makes a best-effort attempt to filter data in the database itself before creating the DataFrame.For example, in the previous sample query, we can see from the query plan that it selects only the relevant column name from the table:
scala> dbDataFrame.select("DEST_COUNTRY_NAME").distinct().explain
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#6], functions=[])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#6, 200)
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#6], functions=[])
+- *(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#6] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
Spark can actually do better than this on certain queries. For example, if we specify a filter on our DataFrame, Spark will push that filter down into the database. We can see this in the explain plan under PushedFilters.
scala> dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain
== Physical Plan ==
*(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#6,ORIGIN_COUNTRY_NAME#7,count#8] PushedFilters: [*In(DEST_COUNTRY_NAME, [Anguilla,Sweden])], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:decimal(20,0)>
Spark can’t translate all of its own functions into the functions available in the SQL database in which you’re working. Therefore, sometimes you’re going to want to pass an entire query into your SQL that will return the results as a DataFrame.Now, this might seem like it’s a bit complicated, but it’s actually quite straightforward. Rather than specifying a table name, you just specify a SQL query. Of course, you do need to specify this in a special way; you must wrap the query in parenthesis and rename it to something
// in Scala
val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
val dbDataFrame = spark.read.format("jdbc").option("url", url).option("dbtable", pushdownQuery).option("driver", driver).load()
Now when you query this table, you’ll actually be querying the results of that query. We can see this in the explain plan. Spark doesn’t even know about the actual schema of the table, just the one that results from our previous query:
scala> dbDataFrame.explain()
== Physical Plan ==
*(1) Scan JDBCRelation((SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#20] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
Reading from databases in parallel
Spark has an underlying algorithm that can read multiple files into one partition, or conversely, read multiple partitions out of one file, depending on the file size and the “splitability” of the file type and compression. The same flexibility that exists with files, also exists with SQL databases except that you must configure it a bit more manually.
What you can configure, as seen in the previous options, is the ability to specify a maximum number of partitions to allow you to limit how much you are reading and writing in parallel:
val dbDataFrame = spark.read.format("jdbc").option("url", url).option("dbtable", tablename).option("driver", driver).option("numPartitions", 10).load()
In this case, this will still remain as one partition because there is not too much data. However, this configuration can help you ensure that you do not overwhelm the database when reading and writing data:dbDataFrame.select("DEST_COUNTRY_NAME").distinct().show()
There are several other optimizations that unfortunately only seem to be under another API set. You can explicitly push predicates down into SQL databases through the connection itself. This optimization allows you to control the physical location of certain data in certain partitions by specifying predicates. let’s look at a simple example. We only need data from two countries in our data: Anguilla and Sweden. We could filter these down and have them pushed into the database, but we can also go further by having them arrive in their own partitions in Spark. We do that by specifying a list of predicates when we create the data source:
// in Scala
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array("DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'", "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'")
spark.read.jdbc(url, tablename, predicates, props).show()
spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions // 2
If you specify predicates that are not disjoint, you can end up with lots of duplicate rows. Here’s an example set of predicates that will result in duplicate rows:
// in Scala
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'")
spark.read.jdbc(url, tablename, predicates, props).count() // 510
Partitioning based on a sliding window
// in Scala
val colName = "count"
val lowerBound = 0L
val upperBound = 348113L // this is the max count in our database
val numPartitions = 10
spark.read.jdbc(url,tablename,colName,lowerBound,upperBound,numPartitions,props)
.count() // 255
Writing to SQL Databases
// in Scala
val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)
csvFile.write.mode("append").jdbc(newPath, tablename, props)
Text Files
Spark also allows you to read in plain-text files. Each line in the file becomes a record in the DataFrame. It is then up to you to transform it accordingly.
Reading Text Files
Reading text files is straightforward: you simply specify the type to be textFile. With textFile, partitioned directory names are ignored. To read and write text files according to partitions, you should use text, which respects partitioning on reading and writing:
spark.read.textFile("data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()
Writing Text Files
When you write a text file, you need to be sure to have only one string column; otherwise, the write will fail:
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")
If you perform some partitioning when performing your write (we’ll discuss partitioning in the next couple of pages), you can write more columns. However, those columns will manifest as directories in the folder to which you’re writing out to, instead of columns on every single file:
// in Scala
csvFile.limit(10).select("DEST_COUNTRY_NAME", "count")
.write.partitionBy("count").text("/tmp/five-csv-files2.csv")
Advanced I/O Concepts
we can control the parallelism of files that we write by controlling the partitions prior to writing. We can also control specific data layout by controlling two things: bucketing and partitioning (discussed momentarily).
Splittable File Types and Compression
Certain file formats are fundamentally “splittable.” This can improve speed because it makes it possible for Spark to avoid reading an entire file, and access only the parts of the file necessary to satisfy your query. Additionally if you’re using something like Hadoop Distributed File System (HDFS), splitting a file can provide further optimization if that file spans multiple blocks. In conjunction with this is a need to manage compression. Not all compression schemes are splittable. How you store your data is of immense consequence when it comes to making your Spark jobs run smoothly. We recommend Parquet with gzip compression.
Reading Data in Parallel
Multiple executors cannot read from the same file at the same time necessarily, but they can read different files at the same time. In general, this means that when you read from a folder with multiple files in it, each one of those files will become a partition in your DataFrame and be read in by available executors in parallel (with the remaining queueing up behind the others).
Writing Data in Parallel
The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data. This means that although we specify a “file,” it’s actually a number of files within a folder, with the name of the specified file, with one file per each partition that is written.
Partitioning
Partitioning is a tool that allows you to control what data is stored (and where) as you write it. When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset. These are supported for all file-based data sources:
// in Scala
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")
.save("/tmp/partitioned-files.parquet")
Bucketing
Bucketing is another file organization approach with which you can control the data that is specifically written to each file. This can help avoid shuffles later when you go to read the data because data with the same bucket ID will all be grouped together into one physical partition. This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles when joining or aggregating.
Rather than partitioning on a specific column (which might write out a ton of directories), it’s probably worthwhile to explore bucketing the data instead. This will create a certain number of files and organize our data into those “buckets”:
val numberBuckets = 10
val columnToBucketBy = "count"
csvFile.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
Bucketing is supported only for Spark-managed tables.
Writing Complex Types
Although Spark can work with all of these types, not every single type works well with every data file format. For instance, CSV files do not support complex types, whereas Parquet and ORC do.
Managing File Size
When you’re writing lots of small files, there’s a significant metadata overhead that you incur managing all of those files. Spark especially does not do well with small files, although many file systems (like HDFS) don’t handle lots of small files well, either. You might hear this referred to as the “small file problem.” The opposite is also true: you don’t want files that are too large either, because it becomes inefficient to have to read entire blocks of data when you need only a few rows.
Spark 2.2 introduced a new method for controlling file sizes in a more automatic way. We saw previously that the number of output files is a derivative of the number of partitions we had at write time (and the partitioning columns we selected). Now, you can take advantage of another tool in order to limit output file sizes so that you can target an optimum file size. You can use the maxRecordsPerFile option and specify a number of your choosing. This allows you to better control file sizes by controlling the number of records that are written to each file. For example, if you set an option for a writer as df.write.option("maxRecordsPerFile", 5000), Spark will ensure that files will contain at most 5,000 records.
Conclusion
there are ways of implementing your own data source; however, we omitted instructions for how to do this because the API is currently evolving to better support Structured Streaming. If you’re interested in seeing how to implement your own custom data sources, the Cassandra Connector is well organized and maintained and could provide a reference for the adventurous.