SparkStreaming
介绍
流式计算框架
批计算
数据已经存在, 一次性读取所有的数据进行批量处理
流计算
数据源源不断的进来, 经过处理后落地
特点
-
-
Spark Streaming
并不是实时流, 而是按照时间切分小批量, 一个一个的小批量处理 -
Spark Streaming
Socket
-
-
Socket
分为Socket server
和Socket client
-
Socket server
监听某个端口, 接收
Socket client
发过来的连接请求建立连接, 连接建立后可以向Socket client
发送TCP packet
交互 (被动) -
Socket client
向某个端口发起连接, 并在连接建立后, 向
Socket server
发送TCP packet
实现交互 (主动)
-
-
TCP
三次握手建立连接-
Step 1
Client
向Server
发送SYN(j)
, 进入SYN_SEND
状态等待Server
响应 -
Step 2
Server
收到Client
的SYN(j)
并发送确认包ACK(j + 1)
, 同时自己也发送一个请求连接的SYN(k)
给Client
, 进入SYN_RECV
状态等待Client
确认 -
Step 3
Client
收到Server
的ACK + SYN
, 向Server
发送连接确认ACK(k + 1)
, 此时,Client
和Server
都进入ESTABLISHED
-
源代码
package sparkStreaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @author mooojl * @Date 2021-01-17-11:55 */ object StreamingWordCount { def main(args: Array[String]): Unit = { // 1. 初始化环境 // 在 SparkCore 中的内存, 创建 SparkContext 的时候使用 // 在创建 Streaming Context 的时候也要用到 conf, 说明 Spark Streaming 是基于 Spark Core 的 // 在执行 master 的时候, 不能指定一个线程 // 因为在 Streaming 运行的时候, 需要开一个新的线程来去一直监听数据的获取 val sparkConf = new SparkConf().setAppName("streaming word count").setMaster("local[6]") // StreamingContext 其实就是 Spark Streaming 的入口 // 相当于 SparkContext 是 Spark Core 的入口一样 // 它们也都叫做 XXContext val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") // socketTextStream 这个方法用于创建一个 DStream, 监听 Socket 输入, 当做文本来处理 // sparkContext.textFile() 创建一个 rdd, 他们俩类似, 都是创建对应的数据集 // RDD -> Spark Core DStream -> Spark Streaming // DStream 可以理解为是一个流式的 RDD val lines: ReceiverInputDStream[String] = ssc.socketTextStream( hostname = "192.168.47.100", port = 9999, storageLevel = StorageLevel.MEMORY_AND_DISK_SER ) // 2. 数据的处理 // 1. 把句子拆为单词 val words: DStream[String] = lines.flatMap(_.split(" ")) // 2. 转换单词 val tuples: DStream[(String, Int)] = words.map((_, 1)) // 3. 词频reduce val counts: DStream[(String, Int)] = tuples.reduceByKey(_ + _) // 3. 展示和启动 counts.print() ssc.start() // main 方法执行完毕后整个程序就会退出, 所以需要阻塞主线程 ssc.awaitTermination() } }
注意点
-
Spark Streaming
的处理机制叫做小批量, 英文叫做mini-batch
, 是收集了一定时间的数据后生成RDD
, 后针对RDD
进行各种转换操作, 这个原理提现在如下两个地方-
控制台中打印的结果是一个批次一个批次的, 统计单词数量也是按照一个批次一个批次的统计
-
多长时间生成一个
RDD
去统计呢? 由new StreamingContext(sparkConf, Seconds(1))
这段代码中的第二个参数指定批次生成的时间
-
-
Spark Streaming
中至少要有两个线程在使用
spark-submit
启动程序的时候, 不能指定一个线程-
主线程被阻塞了, 等待程序运行
-
-