1.MapWithState 小案列
Spark Stream:以批处理为主,用微批处理来处理流数据
Flink:真正的流式处理,以流处理为主,用流处理来处理批数据
但是Spark的Strurctured Stream 确实是真正的流式处理来处理批数据
但是Spark的structured Stream确实是真正的流式处理,也是未来的Spark流式处理的未来方向,新的Stream特性也是加载那里了。
1)MapWithState可以实现和UpdateStateByKey一样对不同批次的数据的分析,但是他是实验性方法,慎用,可能下一版本就没了
2)MapWithState,只有当前批次出现了该key才会显示该key的所有的批次分析数据
3)最好的方式还是写DB
MapWithStateTest {
def main(args: Array[String]) {
//第一个参数是key,第二参数是当前value,第三个参数之前的value
val mappingFunction = (key: String, value: Option[Int], state: State[Int])=> {
val sum = value.getOrElse(0)+state.getOption().getOrElse(0)
state.update(sum)
(key,sum)
}
val sparkConf = new SparkConf()
.setAppName("StatefulNetworkWordCount")
.setMaster("local[3]")
// Create the context with a 5 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint(".")
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of
delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream("192.168.76.120", 1314)
val words = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_+_)
val stateDstream = words.mapWithState(StateSpec.function(mappingFunction))
stateDstream.print()
ssc.start()
ssc.awaitTermination()
2.(重要)foreachRDD小案列
1)foreachRDD是生产中用到最多的output(egger)函数,通过它可将DStream转换为RDD以及DFDS,然后进行操作
2)forearchRDD的方法是在driver中进行的,故写DB时它的获取链接代码必须写在第二层循环的foreachPatition中,不然会爆序列化错误。
3)踩坑,在foreachPatition中数据的获取使用的是迭代器,不管是java还是scala,迭代器只能使用一次,第二次就为空了
4)踩坑,操作DB时要使用连接池
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ForeachRDDApp {
def main(args: Array[String]): Unit = {{
val sc = new SparkConf()
.setAppName("word count")
.setMaster("local[3]")
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("192.168.76.120", 1314)
//当数据多次被使用,数据持久化
// lines.persist(StorageLevel.MEMORY_ONLY)
val wordsDStrem = lines.flatMap(_.split(" "))
wordsDStrem.foreachRDD(rdd =>{
val spark = SparkSession
.builder
.config(rdd.sparkContext.getConf)
.getOrCreate()
import spark.implicits._
val wordDF = rdd.toDF("word")
wordDF.createOrReplaceGlobalTempView("words")
val countDF = spark.sql("select word, count(*) as total from words group by word")
countDF.show()
})
ssc.start()
ssc.awaitTermination()
}
}
}
3.Window小案列
1)窗口的长度以及滑动时间必须是批出来间隔的整数倍关系
2)窗口的功能完全可通过每次存储DB,然后查询多批次来实现
import javax.sql.ConnectionPoolDataSource
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowTFTest {
def main(args: Array[String]): Unit = {
val sc = new SparkConf()
.setAppName("word count")
.setMaster("local[3]")
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("192.168.76.120", 1314)
val wordContDS = lines.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
// wordContDS.print()
//window
val windowDS = wordContDS.window(Seconds(20), Seconds(10))
windowDS.print()
//save
// wordContDS.saveAsTextFiles("C:\Users\admin\Desktop\spark学习\outPutData\")
ssc.start()
ssc.awaitTermination()
}
}
4. transform 白名单小案例
1)transform是Transformatition(lazy)类型操作,用它可以将Dstream转换成RDD,这样我们可以通过RDD的编程去实现业务逻辑,如白名单过滤等
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TransformApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Transform App")
//每隔一秒的数据为一个batch
val ssc = new StreamingContext(conf, Seconds(5))
val whiteRDD = ssc.sparkContext.parallelize(List("17")).map((_, true))
//读取的机器以及端口,读取的数据的格式是:老二,3,1 (姓名,年龄,性别)
val lines = ssc.socketTextStream("192.168.43.125", 1314)
val result = lines.map(x => (x.split(",")(0), x))
.transform(rdd => {
//(老二,((老二,3,1),Option[]))
rdd.leftOuterJoin(whiteRDD)
.filter(_._2._2.getOrElse(false) != true)
.map(_._2._1)
})
result.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
转载于:https://blog.csdn.net/qq_32641659/article/details/90748175