实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之处,Spark Streaing就支持driver和worker节点的错误恢复。然后,在使用某些数据源的时候,错误恢复时输入数据可能会丢失。在spark 1.2中,加入write ahead logs(日志)这个初步方案用来改进恢复机制,保证数据的无丢失。
背景
spark和rdd的设计保证了集群中worker节点的容错性。spark streaming构建在spark之上,所以它的worker节点也是同样的容错机制,然后Spark Streaming应用的高可用性要求应用的driver进程也要有容错性,它是应用的主要进程用于协调所有的worker节点,因为用户应用的计算模式是可变的导致driver的容错性非常棘手,然后Spark Streaming会对实时流中的每一批数据进行运行同样的Spark计算,这样就可以定期的保存应用的状态到一个可靠的存储中,driver重启的时候恢复这些状态。
对于一些文件的数据源,driver的恢复机制可以保证数据无丢失,因为所有的数据都保存在HDFS或S3上面,对于一些像kafka,flume等数据源,接收的数据保存在内存中将有可能丢失,这是因为spark应用分布式运行的,如果driver进程挂了,所有的executor进程将团灭,保存在这些进程所持有内存中的数据将会丢失。为了避免这些数据的丢失,在spark streaming 1.2中 引入了一个write ahead logs。
Write Ahead Logs
WAL使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。
对于像kafka和flume这些使用接收器来接收数据的数据源。接收器作为一个长时间的任务运行在executor中,负责从数据源接收数据,如果数据源支持的话,向数据源确认接收到数据,然后把数据存储在executor的内存中,然后driver在exector上运行任务处理这些数据。
如果wal启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持久性,此外,如果只有在数据 写入到log中之后接收器才向数据源确认,这样drive重启后那些保存在内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。
配置
启动WAL需要做如下的配置
1:给streamingContext设置checkpoint的目录,该目录必须是HADOOP支持的文件系统,用来保存WAL和做Streaming的checkpoint
2:spark.streaming.receiver.writeAheadLog.enable 设置为true
当WAL被启动了以后,所有的接收器接收的数据可以很稳定的恢复,推荐的内存备份可以关闭了(给输入流设置合适的持久化级别),因为WAL保存在可容错的文件系统上,数据已经备份了。
此外,如果想要恢复缓冲的数据,必须使用支持应答的数据源(flume,kafka,kinses)。 当数据存储到日志以后那些支持应答接收器可以向数据源确认。内置的flume和kafka接收器已经实现了这些功能
最后,值得注意的是WAL开启了以后会减少Spark Streaming处理数据的吞吐,因为所有接收的数据会被写到到容错的文件系统上,这样文件系统的吞吐和网络带宽将成为瓶颈。可以通过添加更多接收器或使用更好的软件。
实现细节
下面讲解下WAL的工作原理。过一下Spark Streaming的架构
当一个Spark Streaming应用启动了(例如driver启动), 相应的StreamingContext使用SparkContet去启动receiver,receiver是一个长时间执行的作业,这些接收器接收并保存这些数据到Spark的executor进程的内存中,这些数据的生命周期如下图所示
1:蓝色的箭头表示接收的数据,接收器把数据流打包成块,存储在executor的内存中,如果开启了WAL,将会把数据写入到存在容错文件系统的日志文件中
2:青色的箭头表示提醒driver, 接收到的数据块的元信息发送给driver中的StreamingContext, 这些元数据包括:executor内存中数据块的引用ID和日志文件中数据块的偏移信息
3:红色箭头表示处理数据,每一个批处理间隔,StreamingContext使用块信息用来生成RDD和jobs. SparkContext执行这些job用于处理executor内存中的数据块
4:黄色箭头表示checkpoint这些计算,以便于恢复。流式处理会周期的被checkpoint到文件中
当一个失败的driver重启以后,恢复流程如下
1:黄色的箭头用于恢复计算,checkpointed的信息是用于重启driver,重新构造上下文和重启所有的receiver
2: 青色箭头恢复块元数据信息,所有的块信息对已恢复计算很重要
3:重新生成未完成的job(红色箭头),会使用到2恢复的元数据信息
4:读取保存在日志中的块(蓝色箭头),当job重新执行的时候,块数据将会直接从日志中读取,
5:重发没有确认的数据(紫色的箭头)。缓冲的数据没有写到WAL中去将会被重新发送。