什么是Spark Streaming
Spark Streaming类似于Apache Storm,用于流式数据的处理
Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等
数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等
Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,
另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口
目前流行的三种实时框架对比
Apache
|
Flink
|
SparkSteaming
|
Storm
|
架构
|
架构介于spark和storm之间,主从结构与sparkStreaming相似,DataFlow Grpah与storm相似,数据流可以被表示为一个有向图,每个顶点是一个定义的运算,每向边表示数据的流动
Native
|
架构依赖Spark,主从模式,每个batch批次处理都依赖driver主,可以理解为时间维度上的spark DAG
Micro-Batch
|
主从模式,且依赖ZK,处理过程中对主的依赖不大
Native
|
容错
|
基于Ghandy-Lamport distributed snapshots checkpoint机制
Medium
|
WAL及RDD血统机制
High(高)
|
Records Ack
Medium(一般)
|
处理模型与延时
|
单条时间处理
亚秒级低延时
|
一个事件窗口内的所有事件
秒级低延时
|
每次传入的一个事件
亚秒级低延时
|
吞吐量
|
High
|
High
|
Low(低)
|
数据处理保证
|
Exactly once
High
|
Exactly once(实现架用Chandy-Lamport算法,即marker-checkpoint)
High
|
At least once(实现架用record-level acknowledgments),Trident可以支持storm提供exactly once语义
Medium
|
高级API
|
Flink,栈中提供了很多高级API和满足不同场景的类库:机器学习、图分析、关系式数据处理
High
|
能够很容易的对接Spark生态圈里面的组件,同时额能够对接主流的消息传输组件及存储系统
High
|
应用需要按照特定的storm定义的规模编写
Low
|
易用性
|
支持SQL Streaming,Batch和Streaming采用统一编程框架
High
|
支持SQL Streaming,Batch和Streaming采用统一编程框架
High
|
不支持SQL Streaming
Medium
|
成熟性
|
新兴项目,处于发展阶段
Low
|
已经发展一段时间
Medium
|
相对较早的流系统,比较稳定
High
|
部署性
|
部署相对简单,只依赖JRE环境
Low
|
部署相对简单,只依赖JRE环境
Low
|
依赖JRE环境和ZK
High
|
|
|
|
|
Spark Streaming架构
Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据
StreamingContext 会周期性地运行 Spark 作业来处理这些数据,把数据与之前时间区间中的 RDD 进行整合
什么是Dstream
就是将流式计算分解成为一系列确定并且较小的批处理作业
可以将失败或者执行较慢的任务在其他节点上并行执行
有较强的的容错能力,基于lineage
Dstream内含high-level operations进行处理
Dstream内部实现为一个RDD序列
基本数据源:socket、file,akka actoer。Steaming中自带了该数据源的读取API
高级数据源:kafka,flume,kinesis,Twitter等其他的数据。必须单独导入集成的JAR包
Receiver方式:接收器模式是使用Kafka高级Consumer API实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark Executor的内存中,然后由Spark Streaming启动的job来处理数据。
Direct:直连模式,在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,
其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,并且相应的定义要在每个batch中处理偏移范围,
当启动处理数据的作业时,kafka的简单的消费者api用于从kafka读取定义的偏移范围
简单Spark Streaming实现
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("master01", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Kafka对接Stream实现
object KafkaDirectorDemo {
def main(args: Array[String]): Unit = {
//构建conf ssc 对象 初始化Streamingcontext
val conf = new SparkConf().setAppName("Kafka_director").setMaster("local")
val ssc = new StreamingContext(conf,Seconds(5))
//设置数据检查点进行累计计算 没有的话抛无方法异常
ssc.checkpoint("hdfs://192.168.25.101:9000/checkpoint")
//设置kfaka相关信息
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "CentOS1:9092,CentOS2:9092,CentOS3:9092",//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],//key序列化
"value.deserializer" -> classOf[StringDeserializer],//value序列化
"group.id" -> "group1",//用于标识这个消费者属于哪个消费团体
"auto.offset.reset" -> "latest",//偏移量 latest自动重置偏移量为最新的偏移量
"enable.auto.commit" -> (false: java.lang.Boolean)//如果是true,则这个消费者的偏移量会在后台自动提交
)
//kafka 设置kafka读取topic
val topics = Array("first", "second")
// 获得DStream
val dStreaming = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,Subscribe[String, String](topics, kafkaParams))
val rdd = dStreaming.map(record => (record.key, record.value))
rdd.print()
rdd.count().print()
rdd.countByValue().print()
dStreaming.foreachRDD(rdd=>rdd.foreach(println(_)))
ssc.start()
ssc.awaitTermination()
}
}