一、Spark Streaming概述:
是基于Spark core的API,不需要单独安装,一盏式解决
可扩展、高吞吐量、容错性、能够运行在多节点、结合了批处理、机器学习、图计算等
将不同的数据源的数据经过Spark Streaming处理后输出到外部文件系统
1. 应用场景:
实时交易防欺诈检测、传感器异常实时反应
整理Spark发展史问题(缺少)
2. Spark Streaming工作原理:
粗粒度:
把实时数据流,以秒数拆分成批次的小数据块,通过Spark当成RDD来处理
细粒度:
3. 核心概念:
编程入口:StreamingContext
常用构造方法源码:
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
batchDuration 是必须填的,根据应用程序的延迟需求和资源可用情况来设置
定义好streamingContext后,再定义DStream、transformation等,通过start()开始,stop()结束
注意:
一个context启动后,不能再运行新的streaming(一个JVM只能有一个streamingContext)
一旦停止后,就没办法再重新开始
Stop方法默认把sparkContext和streamingContext同时关掉,要不想关掉sc,必须定义stopSparkContext参数为false
一个SparkContext能够创建多个StreamingContext
最基础的抽象:Discretized Stream (DStream)
一系列的RDD代表一个DStream,是不可变的、分布式的dataset
每一个RDD代表一个时间段(批次)的数据
对DStream进行操作算子(flatMap)时,在底层上看就是对每一个RDD做相同的操作,交由Spark core运行
数据输入:Input DStreams and Receivers
每一个Input DStream 关联着一个Receiver(但从文件系统接收不需要receiver),receiver 接收数据并存在内存中
receiver需要占用一个线程,所以不能定义local[1],线程的数量n必须大于receivers的数量
转换:Transformations on DStreams
与RDD类似:map、flatMap、filter、repartition、count...
数据输出:Output Operations and DStreams:
输出到数据库或者文件系统:
API:print、save、foreach
二、Spark Streaming实战部分:
- Spark Streaming处理socket数据:
接收到的数据进行WordCount操作:
在IDEA中:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /* * Spark Streaming 处理Socket数据 * */ object NetWorkWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount") //创建streamingContext的两个参数sparkConf和seconds val ssc = new StreamingContext(sparkConf, Seconds(5)) //生成Input DStream val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
在控制台中:
nc -lk 6789,创建一个Socket
在这上面输入数据,就可以在IDEA中count出来了
注意:
在执行过程中会报错,必须在Maven projects中找出报错提示中所缺少的包,并且在dependency上加入。
当projects中还没有的包,在http://mvnrepository.com 上搜索相应的dependency,然后让Maven帮我们自动下载。
- Spark Streaming处理HDFS中的数据:
ssc.textFileStream("file_path")
同样是像上面一样,只是改了stream的source
但是测试时,必须要是生成新的文件(官网称为moving进去的文件),才会被统计;而往旧的文件里再添加数据,也不会被统计了
- Spark Streaming进阶实战:
带状态的算子UpdateStateByKey、保存到MySQL、window函数
UpdateStateByKey实现实时更新:
允许把新旧状态结合,连续地更新
准备工作:
- 定义一个状态
- 定义状态更新的方法
注意:
- updateFunction需要隐式转换
- 报错:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set.
意思就是要进行checkpoint记录
实现代码:
把reduceByKey删除,并且把map之后的RDD定义为一个state,配合这个state写状态更新方法
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /* * Spark Streaming有状态的统计 * */ object StatefulWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //使用状态算子必须要设置checkpoint //一般要保存记录在HDFS中 ssc.checkpoint(".") val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)) //不能用reduceByKey //连续更新状态 val state = result.updateStateByKey(updateFunction _) //需要隐式转换 state.print() ssc.start() ssc.awaitTermination() } /* * 状态更新方法更新已有的数据,放在updateStateByKey中 * */ def updateFunction(currData: Seq[Int], prevData: Option[Int]): Option[Int] = { val curr = currData.sum //算出当前的总次数 val prev = prevData.getOrElse(0) //读取已有的 //返回已有和当前的和 Some(curr + prev) } }
- 统计结果写到MySQL中:
前提准备:
需要在IDEA中增加mysql的connector依赖
在mysql数据库中先创建一张表
写jdbc创建连接到Mysql
使用foreachRDD,有很多种错误的写法:(没有序列化,创建太多mysql连接等)
报错没有序列化:
dstream.foreachRDD {rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach {record =>connection.send(record) // executed at the worker
}
}
花太多开销在连接和断开数据库上
dstream.foreachRDD {rdd =>
rdd.foreach {record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
官方正确写法:
使用foreachPartition进行优化连接:
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
val connection = createNewConnection() //创建mysql连接
partitionOfRecords.foreach(record =>
connection.send(record))
connection.close()
}
}
用连接池进行进一步优化:
Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record =>connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
在写入MySQL数据时,应该作一个是否存在的判断:
若存在则使用update语句,不存在则使用insert语句
- Window的使用:
两个参数:
window length:窗口长度
sliding interval:窗口间隔
也就是每隔sliding interval统计前window length的值
API:countByWindow、reduceByWindow…
- 实战:黑名单过滤
transform算子的使用+Spark Streaming整合RDD操作
元组默认从1开始数
假设输入数据为id, name 这种形式
实现过程:
- 建立黑名单元组 => (name, true)
- 把输入数据流编程元组 => (name, (id, name))
- transform,把每个DStream变成一个个RDD操作
- 数据流的RDD与黑名单RDD进行leftjoin,获得新的元组
- filter判断过滤
- 整合输出
实现代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /* * 黑名单过滤demo * */ object TransformApp { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformApp") val ssc = new StreamingContext(sparkConf, Seconds(5)) //构建黑名单列表, 实际应用中可在外面读取列表, 并转成RDD, 用true标记为是黑名单元组(name, true) val blacks = List("zs", "ls") val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true)) //获取每行 val lines = ssc.socketTextStream("localhost", 6789) //把id, name => 元组(name, (id, name)) //transform 的使用,对stream的每个RDD操作 val filterResult = lines.map(x => (x.split(",")(1), x)).transform(rdd => { //与黑名单进行leftjoin => (name, ((id, name), true)), 并过滤出是true的项 rdd.leftOuterJoin(blacksRDD) .filter(x => x._2._2.getOrElse(false) != true) //过滤出不等于true的 .map(x => x._2._1) }) filterResult.print() ssc.start() ssc.awaitTermination() } }
- Spark Streaming整合Spark SQL
整合完成词频统计操作
官网代码:
就是foreachRDD把streaming转成RDD,然后toDF就可以进行DataFrame或者是sql的操作了