一、简介
spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。
二、Dstream的创建
1,文件数据源和Kafka数据源
//文件数据源 streamingContext.textFileStream(dataDirectory) //kafka数据源 val kafkaParam: Map[String, String] = Map[String, String]( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.GROUP_ID_CONFIG -> "myCroup", //消费者组 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092" ) val kafkaDSteam: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( streamingContext, kafkaParam, Set("myTopic"), //主题topic StorageLevel.MEMORY_ONLY )
2,自定义数据源
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { //最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark override def onStart(): Unit = { new Thread("Socket Receiver") { override def run() { receive() } }.start() } //读数据并将数据发送给Spark def receive(): Unit = { //创建一个Socket var socket: Socket = new Socket(host, port) //定义一个变量,用来接收端口传过来的数据 var input: String = null //创建一个BufferedReader用于读取端口传来的数据 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) //读取数据 input = reader.readLine() //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark while (!isStopped() && input != null) { store(input) input = reader.readLine() } //跳出循环则关闭资源 reader.close() socket.close() //重启任务 restart("restart") } override def onStop(): Unit = {} }
//使用自定义数据源
ssc.receiverStream(new CustomerReceiver("linux1", 9999))
三、DStream转换
1,有状态转化操作
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:
1)定义状态,状态可以是一个任意的数据类型。
2)定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
ssc.sparkContext.setCheckpointDir("ck") //必须定义checkpointDir val mapDS = recevieDS.map((_, 1)) //这里必须加类型DStream[(String,Int)],否则报错 val updateStateDStream:DStream[(String,Int)] = mapDS.updateStateByKey{ case (seq,buffer) => //seq为Seq(Int)表示value的序列 buffer为Option(Int)表示缓存 val sum:Int = buffer.getOrElse(0) + seq.sum Option(sum) }
2,Window操作
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
//定义的窗口时长Seconds(9)和步长Seconds(3) 都必须是定义的StreamingContext的整数倍 val windowDS = recevieDS.window(Seconds(9),Seconds(3)) //最终结果 0-n-0 val resultDS = windowDS.map((_,1)).reduceByKey(_+_)
3,transform
Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
//对当前批次中的每个rdd进行操作 wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) ... }