编程模型 | 解释 |
---|---|
|
|
|
|
|
|
RDD
的优点
-
面向对象的操作方式
-
可以处理任何类型的数据
RDD
的缺点
-
运行速度比较慢, 执行过程没有优化
-
API
比较僵硬, 对结构化数据的访问和操作没有优化
DataFrame
的优点
-
针对结构化数据高度优化, 可以通过列名访问和转换数据
-
增加
Catalyst
优化器, 执行过程是优化的, 避免了因为开发者的原因影响效率
DataFrame
的缺点
-
只能操作结构化数据
-
只有无类型的
API
, 也就是只能针对列和SQL
操作数据,API
依然僵硬
Dataset
的优点
-
结合了
RDD
和DataFrame
的API
, 既可以操作结构化数据, 也可以操作非结构化数据 -
既有有类型的
API
也有无类型的API
, 灵活选择
整体结构
HDFS读数据:
目标
-
在数据处理的时候, 经常会遇到这样的场景
-
有时候也会遇到这样的场景
-
以上两种场景有两个共同的特点
-
会产生大量小文件在
HDFS
上 -
数据需要处理
-
-
通过本章节的学习, 便能够更深刻的理解这种结构, 具有使用
Structured Streaming
整合HDFS
, 从其中读取数据的能力
案例流程
难点和易错点
import os for index in range(100): content = """ {"name": "Michael"} {"name": "Andy", "age": 30} {"name": "Justin", "age": 19} """ file_name = "/export/dataset/text{0}.json".format(index) with open(file_name, "w") as file: file.write(content) os.system("/export/servers/hadoop/bin/hdfs dfs -mkdir -p /dataset/dataset/") os.system("/export/servers/hadoop/bin/hdfs dfs -put {0} /dataset/dataset/".format(file_name))
执行python文件
删除hdfs目录
package cn.itcast.structured import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType object HDFSSource { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "C:\winutil") // 1. 创建 SparkSession val spark = SparkSession.builder() .appName("hdfs_source") .master("local[6]") .getOrCreate() // 2. 数据读取, 目录只能是文件夹, 不能是某一个文件 val schema = new StructType() .add("name", "string") .add("age", "integer") val source = spark.readStream .schema(schema) .json("hdfs://node01:8020/dataset/dataset") // 3. 输出结果 source.writeStream .outputMode(OutputMode.Append()) .format("console") .start() .awaitTermination() } }
运行 Python 程序
kafka:
Kafka 的特点
Topic 和 Partitions
总结
创建 Topic 并输入数据到 Topic
使用 Spark 读取 Kafka 的 Topic
sink
从 Kafka
中获取数据, 简单处理, 再次放入 Kafka
package cn.itcast.structured import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object HDFSSink { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "C:\winutil") // 1. 创建 SparkSession val spark = SparkSession.builder() .appName("hdfs_sink") .master("local[6]") .getOrCreate() import spark.implicits._ // 2. 读取 Kafka 数据 val source: Dataset[String] = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092") .option("subscribe", "streaming_test_2") .option("startingOffsets", "earliest") .load() .selectExpr("CAST(value AS STRING) as value") .as[String] // 1::Toy Story (1995)::Animation|Children's|Comedy // 3. 处理 CSV, Dataset(String), Dataset(id, name, category) val result = source.map(item => { val arr = item.split("::") (arr(0).toInt, arr(1).toString, arr(2).toString) }).as[(Int, String, String)].toDF("id", "name", "category") // 4. 落地到 HDFS 中 result.writeStream .format("parquet") .option("path", "dataset/streaming/moives/") .option("checkpointLocation", "checkpoint") .start() .awaitTermination() } }
掌握 Foreach
模式理解如何扩展 Structured Streaming
的 Sink
, 同时能够将数据落地到 MySQL
package cn.itcast.structured import java.sql.{Connection, DriverManager, Statement} import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession} object ForeachSink { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "C:\winutil") // 1. 创建 SparkSession val spark = SparkSession.builder() .appName("hdfs_sink") .master("local[6]") .getOrCreate() import spark.implicits._ // 2. 读取 Kafka 数据 val source: Dataset[String] = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092") .option("subscribe", "streaming_test_2") .option("startingOffsets", "earliest") .load() .selectExpr("CAST(value AS STRING) as value") .as[String] // 1::Toy Story (1995)::Animation|Children's|Comedy // 3. 处理 CSV, Dataset(String), Dataset(id, name, category) val result = source.map(item => { val arr = item.split("::") (arr(0).toInt, arr(1).toString, arr(2).toString) }).as[(Int, String, String)].toDF("id", "name", "category") // 4. 落地到 MySQL class MySQLWriter extends ForeachWriter[Row] { private val driver = "com.mysql.jdbc.Driver" private var connection: Connection = _ private val url = "jdbc::mysql://node01:3306/streaming-movies-result" private var statement: Statement = _ override def open(partitionId: Long, version: Long): Boolean = { Class.forName(driver) connection = DriverManager.getConnection(url) statement = connection.createStatement() true } override def process(value: Row): Unit = { statement.executeUpdate(s"insert into movies values(${value.get(0)}, ${value.get(1)}, ${value.get(2)})") } override def close(errorOrNull: Throwable): Unit = { connection.close() } } result.writeStream .foreach(new MySQLWriter) .start() .awaitTermination() } }
triggers
package cn.itcast.structured import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.{OutputMode, Trigger} object Triggers { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "C:\winutil") // 创建数据源 val spark = SparkSession.builder() .appName("triggers") .master("local[6]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") // timestamp, value val source = spark.readStream .format("rate") .load() // 简单处理 // val result = source // 落地 source.writeStream .format("console") .outputMode(OutputMode.Append()) .trigger(Trigger.Once()) .start() .awaitTermination() } }