Spark Streaming
1、介绍
Spark Streaming是Spark core API的扩展,针对实时数据流计算,具有可伸缩性、高吞吐量、自动容错机制的特点。数据源可以来自于多种方式,例如kafka、flume等等。使用类似于RDD的高级算子进行复杂计算,像map、reduce、join和window等等。最后,处理的数据推送到数据库、文件系统或者仪表盘等。也可以对流计算应用机器学习和图计算。
在内部,spark streaming接收实时数据流,然后切割成一个个批次,然后通过spark引擎生成result的数据流。
Spark Streaming提供了称为离散流(DStream-discretized stream)的高级抽象,代表了连续的数据流。离散流通过kafka、flume等源创建,也可以通过高级操作像map、filter等变换得到,类似于RDD的行为。内部,离散流表现为连续的RDD。
2、体验Spark Streaming编程
-
创建模块,添加spark-streaming的maven依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency>
-
编写word count的scala程序
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Spark Streaming程序 */ object WordCountStreamingScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("streaming") conf.setMaster("local[*]") //创建SparkStreamingContext val sc = new StreamingContext(conf , Seconds(1)) ; //行流,对接套接字文本流 val lines = sc.socketTextStream("s101" , 8888) //单词流 val words = lines.flatMap(_.split(" ")) //对流 val pairs = words.map((_, 1)) //计算结果 val result = pairs.reduceByKey(_+_) //打印结果 result.print() //启动上下文 sc.start() //等待停止 sc.awaitTermination() } }
-
导入log4j属性配置文件,修改日志级别,否则输出过多信息,不利用观察
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Set everything to be logged to the console # 修改这里的INFO为ERROR # log4j.rootCategory=INFO, console log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to WARN. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=WARN # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=WARN log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
-
启动nc服务器
$>nc -lk 8888
-
启动streaming程序
-
在nc服务器程序命令输入单词
-
在Streaming控制台查看结果输出
-
编写java版流应用程序
package com.oldboy.bigdata.spark.java; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * spark streaming java版 */ public class WordCountStreamingJava { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf() ; conf.setAppName("streaming java") ; conf.setMaster("local[*]") ; //创建上下文 JavaStreamingContext sc = new JavaStreamingContext(conf , Durations.seconds(1)) ; //行 JavaDStream<String> lines = sc.socketTextStream("s101", 8888); //单词 JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { String[] arr = s.split(" ") ; return Arrays.asList(arr).iterator(); } }) ; //对流 JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }) ; //结果 JavaPairDStream<String, Integer> res= pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2 ; } }) ; //输出结果 res.print(); //启动 sc.start(); sc.awaitTermination(); } }
3、基本概念
3.1 StreamingContext初始化
appName是应用程序名称,master是Spark,、Mesos或YARN,也可以是local,local是本地模式运行。实际应用不需要指定master值,通过spark-submit提交命令中获取该参数,定义完上下文后,必须要完成如下工作:
- 通过创建离散流定义数据源
- 为流定义变换等计算工作
- streamingContext.start()开始接受数据并进行处理
- 使用streamingContext.awaitTermination()函数停止应用
- 手动调用streamingContext.stop()方法停止应用
切记:
- 上下文启动后,不能设置新的计算方法
- 上下文停止后,不能重启
- 流上下文停止后,还会停止SparkContext,如果不希望停止SparkContext,可以通过stop(false)。
- SparkContext可以重用来创建多个流上下文,新的流上下文创建前需要停止上一个流上下文。
3.2 离散流(DStream)
离散流是Spark流应用的抽象,表示的是连续的数据流,数据流要么从数据源而来,或者通过变换生成。在内部,离散流表示连续的RDD。
对离散流的任何应用,都会转换为操纵底层的RDD:
底层的RDD变换工作,Spark引擎进行计算。
3.3 Input DStream和Receiver
InputStream Dstream也是一种DStream,从数据源接受的数据流,每个Input DStream都和一个Receiver关联,Receiver是接受数据并存储在Spark内存中。
Spark内部提供了两种类型的源:
-
基本源
Spark API直接能够使用,比如FileSystem或Socket连接。
-
高级源
像kafka、flume等源需要借助于第三方工具类进行连接。
Spark可以在一个流上下文中创建多个InputStream,就可以进行并行计算,这些创建的多个Input DStream具有相同时间切片,不可能给不同的Input DStream分别设置时间切片,因为时间切片设置在StreamContext中完成,同时也会创建多个Receiver。接受器单独占用一个CPU内核,即在一个单独的线程中死循环方式读取数据,需要分配足够的cpu内核来处理数据。保证CPU内核数据大于Receiver个数。
注意事项:
- 本地模式下,不能使用local或者local[1],Receiver占据唯一的线程,没有线程执行计算工作。
- 扩展到集群,内核数大于Receiver个数。
4、Receiver
4.1 内部结构
Receiver内部维护了队列,放置的是Block对象,Block包含blockId的ArrayBuffer两个属性。每个Block对应一个分区,默认每200ms(可通过spark.streaming.blockInterval修改)生成一个Block对象并推送到队列中,在StreamingContext中指定的时间片就是一个RDD的时长,因此每个RDD含有多少分区,只要计算一下是200ms的多少倍,然后就可以确定RDD内含有多少个分区了,但如果没有产生数据,就就不会生成分区,因此分区数不会超过这个倍数。内部结果如图所示:
4.2 分区数控制
修改块生成间隔即可改变分区数,代码如下:
//块间隔设置,org.apache.spark.streaming.receiver.BlockGenerator#103
conf.set("spark.streaming.blockInterval", "500ms")
4.3 限速处理
-
每秒接收记录数
Spark Streaming可以控制每个Receiver每秒接收消息条数的上限,默认没有设置,就没有上限。该种方式缺点可能对集群处理能力估计不足,导致计算资源浪费。
//每秒最多接受20条记录 conf.set("spark.streaming.receiver.maxRate" , "20")
-
压后(backpress)处理
可以上spark Streaming基于当前batch的调度延迟与处理时间来控制接收速率,以备让系统只接受系统能够处理的速率。可以通过spark.streaming.backpressure.enabled属性开启,默认该属性是禁用的。对于Receiver的第一个批次的速率限制通过spark.streaming.backpressure.initialRate进行设置。启用压后处理属性后,在Spark Streaming内部,会动态设置Receiver接收速率的最大值。如果设置了spark.streaming.receiver.maxRate和spark.streaming.kafka.maxRatePerP-artition属性,则速率不会超过这一设置。具体配置方式如下:
//启用压后控制 conf.set("spark.streaming.backpressure.enabled" , "true") //设置第一个batch的接收速率 conf.set("spark.streaming.backpressure.initialRate" , "10000")
5、window操作
5.1 介绍
窗口是若干RDD的集合,窗口的长度必须是批次的整倍数,窗口的滑动间隔也必须是批次整倍数。比如每分钟查询最近一小时内的百度热词,一分钟就是窗口的滑动间隔,一小时就是窗口长度。
5.2 编程实现
val result = pairs.reduceByKeyAndWindow((a:Int,b:Int)=>a + b, Seconds(10) , Seconds(3))
6、updateStateByKey
按key更新状态是spark streaming对k-v类型的DStream提供的操作,是对每个K关联一个状态对象,可以是任何对象,该状态对象会随着DStream的流动,从上一个的RDD流向到下一个RDD,工作流程如下图所示:
该函数需要传递一个高阶函数,方法签名如下:
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
函数有两个参数,Seq[V]是本次RDD中K下的V值列表,Option[S]就跟K关联的状态对象,该函数返回状态对象Option[S],使用Option作为状态类型,意味着状态对象可能不存在。
例如,从应用启动开始,每个单词出现的次数,则使用如下代码实现:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
*/
object WordCountStreamingUpdateByKeyScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("streaming")
conf.setMaster("local[8]")
//创建SparkStreamingContext
val sc = new StreamingContext(conf , Seconds(2)) ;
/******************************************************
* *
* 注意:此处需要设置检查点目录,存放rdd数据 *
* *
******************************************************/
sc.checkpoint("file:///H:\spark\streaming")
//行流
val lines = sc.socketTextStream("s101" , 8888)
//单词流
val words = lines.flatMap(_.split(" "))
//对流
val pairs = words.map((_, 1))
//窗口化操作.
val result = pairs.reduceByKey((a:Int,b:Int)=>a + b)
//
def updateFunc(a:Seq[Int] , b:Option[Int]) = {
if(b.isEmpty){
if(a.isEmpty){
Some(0)
}
else{
Some(a.sum)
}
}
else{
val old = b.get
if(a.isEmpty){
Some(old)
}
else{
Some(old + a.sum)
}
}
}
val ds = result.updateStateByKey(updateFunc)
ds.print()
sc.start()
sc.awaitTermination()
}
}
该带码执行的结果如下:
7、避免大量小文件
spark Streaming提供的saveAsTextFile方法是将每个RDD的每个分区输出到一个文件中,由于时间片通常是几秒,因此导致产生大量的小文件,进而影响Namenode的资源以及计算时导致大量的task出现。解决办法就是使用DStream的foreachRDD手动遍历每个分区,按照自定义法则将多个分区数据写入一个文件中,以下就是将多个RDD中相同分区索引的数据写入一个文件中,文件以主机名-精确化分的时间串-分区索引格式进行命令,代码实现如下:
result.foreachRDD(rdd=>{
rdd.mapPartitionsWithIndex((idx,it)=>{
//当前时间
val now = new java.util.Date
//格式化
val sdf = new SimpleDateFormat("yyyy-MM-dd HH-mm")
//提取时间串
val minStr = sdf.format(now)
//主机名
val host = InetAddress.getLocalHost.getHostName
//
val file = new File("H:\spark\streaming",
host + "-" + minStr + "-" + idx+ ".txt")
//文件输出流
val out = new FileOutputStream(file, true)
//写入整个分区数据
for (e <- it) {
out.write(e.toString().getBytes)
out.flush()
}
out.close()
it
}).count()
})
8、Spark Streaming同Spark SQL集成
spark streaming中使用SQL,只需要创建Spark Session对象,将RDD转换成DataFrame即可,然后注册DataFrame成为临时视图后,就可以使用Spark SQL了,具体代码如下:
import java.io.{File, FileOutputStream}
import java.net.InetAddress
import java.text.SimpleDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
*/
object WordCountStreamingSQLScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("streaming")
conf.setMaster("local[8]")
//创建SparkStreamingContext
val sc = new StreamingContext(conf , Seconds(2)) ;
sc.checkpoint("file:///H:\spark\streaming")
//行流
val lines = sc.socketTextStream("127.0.0.1" , 8888)
lines.foreachRDD(rdd=>{
val rdd1 = rdd.flatMap(_.split(" "))
val rdd2 = rdd1.map((_,1))
//创建SparkSession对象,使用RDD的配置即可
val sess = SparkSession.builder()
.config(rdd.sparkContext.getConf)
.getOrCreate()
//导入Spark Session隐式转换
import sess.implicits._
//转换RDD成为DataFrame,并注册成临时表
rdd2.toDF("word" ,"cnt").createOrReplaceTempView("_wc")
//执行sql操作
sess.sql("select word , count(cnt) from _wc group by word").show(1000,false)
})
sc.start()
sc.awaitTermination()
}
}
注意:
sc.socketTextStream("127.0.0.1" , 8888)方法如果是本地的socket,需要使用127.0.0.1地址,使用localhost不好使,bug!!!!!
9、程序的部署
运行spark Streaming应用不能在spark-shell下执行,需要使用spark-submit命令提交执行。需要如下内容:
-
cluster manager
通过--master指定master URL地址。
$>spark-submit --master spark://s101:7077 ...
-
导出jar包
如果应用包含第三方组件,比如kafka,需要将所有的第三方类库导出到jar包中,spark自身的和Spark Streaming的包则不必。
-
给executor配置足够的内存
由于接收到的数据必须存在内存中,executor必须提供足够的内存来存储他们。如果又启用了window操作,最少要配置window长度容纳的数量。
$>spark-submit --executor-memory 2g ...
-
配置检查点
如果应用中用到了检查点,必须使用hadoop兼容的具有容错存储能力的检查点目录, 比如hdfs或S3。流计算应用会在里面写入检查点信息供故障恢复。
val sc = new StreamingContext(conf , Seconds(2)) ; //配置检查点目录 sc.checkpoint("file:///H:\spark\streaming")
-
配置Driver的自动重启
若需要Driver故障时自动恢复,那么用来运行流计算应用的部署命令必须能够监控driver进程并在他故障时重启。不同的集群管理有着不同的工具可以实现这一点:
-
spark standalone
standalone cluster模式支持应用程序非零退出后的自动重启,若要使用这一特性,可以在spark-submit命中增加--supervise参数来获得。如下所示:
$>spark-submit --supervise --master spark://s101:7077 --class MyApp myapp.jar
如果要杀死落入重复失败状况下的应用,可以执行以下命令:
$>spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
-
YARN
-
-
配置写前日志
从spark1.2开始,引入了写前日志来获得强容错性保证。如果启用的话,接收到的所有数据都会写入到检查点设置的写前日志中。这可以防止driver恢复时数据丢失,由此可以确保流数据零流失。可用通过spark.streaming.receiver.writeAhea-dLog.enable设置为true来启用写前日志,但会导致每个Receiver接受吞吐量的降低,可以通过增加Receiver进行补偿。
此外,如果开启了写前日志,可以禁用spark对接受的数据副本化存储,因为写前日志已经存储在了副本模式的文件系统中了。可以通过设置InputStream的持久化模式为StorageLevel.MEMORY_AND_DISK_SER来完成。代码如下:
//启用写前日志 conf.set("spark.streaming.receiver.writeAheadLog.enable" , "true") ... val lines = sc.socketTextStream("s101" , 8888) //只有一个副本 lines.persist(StorageLevel.MEMORY_AND_DISK) ;
-
设置最大接受速率
资源不足时可以进行限速处理,Receiver类型可以通过spark.streaming.receiver.maxRate设置,kafka方式可以通过 spark.streaming.kafka.maxRatePerPartition设置。Spark1.5之后引入了压后机制,不再需要限速设置,Spark Steaming自动找出速率限制并进行动态调增。压后控制可以通过spark.streaming.backpressure.enabled=true开启。
val conf = new SparkConf conf.set("spark.streaming.backpressure.enabled" , "true")
10、RDD的缓存管理
rdd执行结果可以进行缓存起来,以备后面使用rdd时不需要重复计算,直接提取计算结果即可。设置rdd缓存之后,必须unpersist之后才能继续再重新设置缓存级别。rdd可以缓存结果到内存中或磁盘中,如果是磁盘级别,保存数据到临时目录下,临时目录可以通过spark.local.dir进行修改。
// 缓存RDD
rdd.cache()
rdd.persist()
// 内存中缓存
rdd.persist(StorageLevel.MEMORY_ONLY)
RDD的缓存级别:
// 存储级别
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
11、RDD的检查点
调用rdd.checkpoint()方法时,会将rdd结果保存到检查点目录,检查点目录通过sc.setCheckPointDir()设置。
//
sc.setCheckpointDir("file:///H:/chk")
12、修改spark的本地临时目录
conf.set("spark.local.dir" ,"file:///H:/tmp" );