批量计算

流计算

Netcat 的使用

项目实例
目标:使用 Spark Streaming 程序和 Socket server 进行交互, 从 Server 处获取实时传输过来的字符串, 拆开单词并统计单词数量, 最后打印出来每一个小批次的单词数量

步骤:
package cn.itcast.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
//1.初始化
val sparkConf=new SparkConf().setAppName("streaming").setMaster("local[2]")
val ssc=new StreamingContext(sparkConf,Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream(
hostname = "192.168.31.101",
port = 9999,
storageLevel = StorageLevel.MEMORY_AND_DISK_SER
)
//2.数据处理
//2.1把句子拆单词
val words: DStream[String] =lines.flatMap(_.split(" "))
val tuples: DStream[(String, Int)] =words.map((_,1))
val counts: DStream[(String, Int)] =tuples.reduceByKey(_+_)
//3.展示
counts.print()
ssc.start()
ssc.awaitTermination()
}
}
开始进行交互:


注意:
Spark Streaming 并不是真正的来一条数据处理一条
Spark Streaming 的处理机制叫做小批量, 英文叫做 mini-batch, 是收集了一定时间的数据后生成 RDD, 后针对 RDD 进行各种转换操作, 这个原理提现在如下两个地方
- 控制台中打印的结果是一个批次一个批次的, 统计单词数量也是按照一个批次一个批次的统计
- 多长时间生成一个
RDD 去统计呢? 由 new StreamingContext(sparkConf, Seconds(1)) 这段代码中的第二个参数指定批次生成的时间
Spark Streaming 中至少要有两个线程
在使用 spark-submit 启动程序的时候, 不能指定一个线程

| 算子 | 释义 |
|
flatMap
|
将一个数据一对多的转换为另外的形式, 规则通过传入函数指定
|
|
map
|
|
|
reduceByKey
|
这个算子需要特别注意, 这个聚合并不是针对于整个流, 而是针对于某个批次的数据
|
| 编程模型 | 解释 |
|
RDD
|
-
针对自定义数据对象进行处理, 可以处理任意类型的对象, 比较符合面向对象
-
RDD 无法感知到数据的结构, 无法针对数据结构进行编程
|
|
DataFrame
|
-
DataFrame 保留有数据的元信息, API 针对数据的结构进行处理, 例如说可以根据数据的某一列进行排序或者分组
-
DataFrame 在执行的时候会经过 Catalyst 进行优化, 并且序列化更加高效, 性能会更好
-
DataFrame 只能处理结构化的数据, 无法处理非结构化的数据, 因为 DataFrame 的内部使用 Row 对象保存数据
-
Spark 为 DataFrame 设计了新的数据读写框架, 更加强大, 支持的数据源众多
|
|
Dataset
|
-
Dataset 结合了 RDD 和 DataFrame 的特点, 从 API 上即可以处理结构化数据, 也可以处理非结构化数据
-
Dataset 和 DataFrame 其实是一个东西, 所以 DataFrame 的性能优势, 在 Dataset 上也有
|
Spark Streaming 时代
Structured Streaming 时代