SparkStreaming
1).概述:可扩展、高可用、容错性
一站式解决方案
2).原理
粗粒度:Spark Streaming 接受实时流数据,将数据切分为批次数据,交由Spark Engine处理数据。组RDD,微批处理。
细粒度:
3).核心
StreamingContext:
start 之后不能 add StreamingContext
Stop 之后不能 restart
一个JVM仅仅能有一个StreamingContext
stop(false) 能仅仅停止StreamingContext而不停止SparkContext。stop() 两者全部停止
ssc = new StreamingContext(conf, Seconds(2))
Dstream:
来源于Dstrem或者Source
一系列的RDD,每个批次
Dstream操作底层为RDD操作
Input Dstream & Receviers:
sources,除了文件系统源,所有的input dstream都需要关联Receivers(ReceiverDstream)
基于Receiver的Dstream线程数必须大于 Receiver数量
transform Output Operator
4).带状态的算子 updateStateByKey
package com.spark.train import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /* spark streaming 有状态的WordCount updateStateByKey */ object statefulWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("updateStateByKey for wordCount") val ssc = new StreamingContext(conf, Seconds(4)) // need a checkPoint ssc.checkpoint("/opt/datas/spark_data/updateStateByKey1") val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")).map((_, 1)) val wordCount = words.updateStateByKey[Int](updateFunc _) wordCount.print() ssc.start() ssc.awaitTermination() } /** updateStateByKey update function * currentValues: new count * preValues: previous count * * Some(): som */ def updateFunc(currentValues:Seq[Int], preValuse:Option[Int]):Option[Int] = { val newCount = currentValues.sum val preCount = preValuse.getOrElse(0) Some(newCount + preCount) } }
5).黑名单过滤
package com.spark.train import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 黑名单过滤 */ object filterBlackListSparkStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("filter black list") val ssc = new StreamingContext(conf, Seconds(5)) /** * 黑名单转 RDD * */ val blackList = List("ls","zs") val rddBlackList = ssc.sparkContext.parallelize(blackList).map(x => (x, true)) val logs = ssc.socketTextStream("localhost", 9999) .map(x => (x.split(",")(1), x)) .transform(rdd => { rdd.leftOuterJoin(rddBlackList) .filter(x => x._2._2.getOrElse(false) != true) .map(x => x._2._1) }) logs.print() ssc.start() ssc.awaitTermination() } }
Flume和Spark Streaming集成两种方式:代码如下:
package com.spark.test import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils object flumeSparkStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("flumeDstream") val ssc = new StreamingContext(conf, Seconds(5)) /** 第一种方式 * define flumeDstream * 先启动spark streaming ,在启动flume,最好telnet 开始wordCount */ val flumeDstream = FlumeUtils.createStream(ssc, "bigdata", 3333) val wordCnt = flumeDstream.map(x => new String(x.event.getBody.array()).trim()) .flatMap(_.split(" ")) .map((_,1)).reduceByKey(_ + _) wordCnt.print() /**第二种方式 * createPollingStream * 需要重新设置flume sink为 org.apache.spark.streaming.flume.sink.SparkSink * 其他配置一样,需要先启动flume,数据会缓存,然后streaming 去pull 数据 */ // val XXX = FlumeUtils.createPollingStream(ssc, "bigdata", 3333) // .map(x => new String(x.event.getBody.array()).trim) // .flatMap(_.split("")) // .map((_, 1)) // .reduceByKey(_ + _) ssc.start() ssc.awaitTermination() } }