摘自 http://dblab.xmu.edu.cn/blog/1084-2/
简介
DStream是Spark Streaming的编程模型,DStream的操作包括输入、转换和输出。
Spark Streaming工作原理
,在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行task。在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上。每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)。Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据。
Spark Streaming程序基本步骤
编写Spark Streaming程序的基本步骤是:
1.通过创建输入DStream来定义输入源
2.通过对DStream应用转换操作和输出操作来定义流计算。
3.用streamingContext.start()来开始接收数据和处理流程。
4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
5.可以通过streamingContext.stop()来手动结束流计算进程。
创建StreamingContext对象
如果要运行一个Spark Streaming程序,就需要首先生成一个StreamingContext对象,它是Spark Streaming程序的主入口。因此,在定义输入之前,我们首先介绍如何创建StreamingContext对象。我们可以从一个SparkConf对象创建一个StreamingContext对象。
请登录Linux系统,启动spark-shell。进入spark-shell以后,就已经获得了一个默认的SparkConext,也就是sc。因此,可以采用如下方式来创建StreamingContext对象:
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(1))
Seconds(1)表示每隔1秒钟就自动执行一次流计算,这个秒数可以自由设定。
如果是编写一个独立的Spark Streaming程序,而不是在spark-shell中运行,则需要通过如下方式创建StreamingContext对象:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
setAppName(“TestDStream”)是用来设置应用程序名称,这里我们取名为“TestDStream”。setMaster(“local[2]”)括号里的参数”local[2]’字符串表示运行在本地模式下,并且启动2个工作线程。
文件流操作
Spark支持从兼容HDFS API的文件系统中读取数据,创建数据流。
为了能够演示文件流的创建,我们需要首先创建一个日志目录,并在里面放置两个模拟的日志文件。请在Linux系统中打开另一个终端,进入Shell命令提示符状态:
sudo mkdir -p /usr/local/spark/mycode/streaming/logfile
cd /usr/local/spark/mycode/streaming/logfile
然后,在logfile中新建两个日志文件log1.txt和log2.txt,里面可以随便输入一些内容。
比如,我们在log1.txt中输入以下内容:
I love Hadoop
I love Spark
Spark is fast
下面我们就进入spark-shell创建文件流。请另外打开一个终端窗口,启动进入spark-shell。
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(20))
scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
scala> val words = lines.flatMap(_.split(" "))
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
scala> wordCounts.print()
scala> ssc.start() //实际上,当你输入这行回车后,Spark Streaming就开始进行循环监听,下面的ssc.awaitTermination()是无法输入到屏幕上的,但是,为了程序完整性,这里还是给出ssc.awaitTermination()
scala> ssc.awaitTermination()
所以,上面在spark-shell中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,如下:
//这里省略若干屏幕信息
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
//这里省略若干屏幕信息
从上面的屏幕显示信息可以看出,Spark Streaming每隔20秒就监听一次。但是,你这时会感到奇怪,既然启动监听了,为什么程序没有把我们刚才放置在”/usr/local/spark/mycode/streaming/logfile”目录下的log1.txt和log2.txt这两个文件中的内容读取出来呢?原因是,监听程序只监听”/usr/local/spark/mycode/streaming/logfile”目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。所以,为了能够让程序读取文件内容并显示到屏幕上,让我们能够看到效果,这时,我们需要到”/usr/local/spark/mycode/streaming/logfile”目录下再新建一个log3.txt文件,请打开另外一个终端窗口(我们称为shell窗口),当前正在执行监听工作的spark-shell窗口依然保留。请在shell窗口中执行Linux操作,在”/usr/local/spark/mycode/streaming/logfile”目录下再新建一个log3.txt文件,里面随便输入一些英文单词,创建完成以后,再切换回到spark-shell窗口。请等待20秒(因为我们刚才设置的是每隔20秒就监听一次,如果你创建文件动作很快,可能20秒还没到)。现在你会发现屏幕上不断输出新的信息,导致你无法看清楚单词统计结果是否已经被打印到屏幕上。
所以,你现在必须停止这个监听程序,否则它一直在spark-shell窗口中不断循环监听,停止的方法是,按键盘Ctrl+D,或者Ctrl+C。停止以后,就彻底停止,并且退出了spark-shell状态,回到了Shell命令提示符状态。
好了,上面我们是在spark-shell中直接执行代码,但是,很多时候,我们需要编写独立应用程序进行监听,所以,下面我们介绍如何采用独立应用程序的方式实现上述监听文件夹的功能。
我们采用scala语言编写程序,而且要采用sbt打包编译,因此,必须符合sbt打包的规范(可以点击这里参考前面章节内容复习一下如何使用[sbt打包编译scala][1]程序)。当然,不复习以前的知识,直接按照我们下面的步骤来,你也可以顺利实现sbt打包编译。
请打开一个Linux终端窗口,进入shell命令提示符状态,然后,执行下面命令:
mkdir -p /usr/local/spark/mycode/streaming/src/main/scala
cd /usr/local/spark/mycode/streaming/src/main/scala
vim TestStreaming.scala
这样就用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码:
import org.apache.spark._
import org.apache.spark.streaming._
object WordCountStreaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据
val ssc = new StreamingContext(sparkConf, Seconds(20))// 时间间隔为20秒
val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") //这里采用本地文件,当然你也可以采用HDFS文件
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
代码文件写好后,就可以保存并退出vim编辑器。然后,执行下面命令:
cd /usr/local/spark/mycode/streaming
vim simple.sbt //注意,是simple.sbt,不是simple.txt
打开vim编辑器以后,在simple.sbt文件中输入以下代码:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.0"
然后,就可以执行sbt打包编译了,命令如下:
cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package
打包成功以后,就可以输入以下命令启动这个程序:
cd /usr/local/spark/mycode/streaming
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --class "WordCountStreaming" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar
执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口),这时,你就可以像刚才一样,切换到另外一个Shell窗口,在”/usr/local/spark/mycode/streaming/logfile”目录下再新建一个log5.txt文件,文件里面随便输入一些单词,保存好文件退出vim编辑器。然后,再次切换回“监听窗口”,等待20秒以后,按键盘Ctrl+C或者Ctrl+D停止监听程序,就可以看到监听窗口的屏幕上会打印出单词统计信息。
套接字流操作
Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。Spark Streaming自身就提供了一个简单的样例程序,我们先直接演示这个程序,看看效果,然后再动手编写程序打包运行。
请打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:
nc -lk 9999
上面这个启动了nc程序的窗口,我们可以称为“nc窗口”,执行上面命令以后,这个窗口就等待我们输入,我们可以随便输入单词,这些输入的单词就形成数据源,但是,现在不要着急输入单词,因为,我们的Socket监听程序还没有启动,现在输入单词也没有效果。下面我们启动Socket监听程序,请打开另外一个终端窗口,我们称为“监听窗口”,请在这个窗口里面运行下面命令:
cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7
./bin/run-example streaming.NetworkWordCount localhost 9999
执行上面命令以后,就会启动Spark Streaming自带的样例程序,这个程序的功能是,监听“nc窗口”输入的单词,然后,统计单词出现的次数。注意,这个NetworkWordCount程序每隔1秒钟自动接收一次来自nc窗口的数据流(也就是在这1秒内你在nc窗口内输入的所有单词)。这1秒和下1秒输入的单词,是独立分开统计次数的,不会累计。你就可以看到监听窗口会出现类似下面的统计结果(可能中间会夹杂很多调试信息,这是因为log4j日志配置等级较低,调高就行):
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
到这里,关于如何运行Spark自带的NetworkWordCount样例程序,就介绍清楚了。下面我们看看,如果自己编写程序实现这样的功能。
请新打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:
cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7/examples/src/main/scala/org/apache/spark/examples/streaming/
ls
这时,你可以看到在这个目录下有个文件NetworkWordCount.scala,这个其实就是刚才执行词频统计程序的源代码,我们可以直接拷贝这个代码文件到自己的目录下,如下:
cp ./NetworkWordCount.scala /usr/local/spark/mycode/streaming/src/main/scala //如果这个用户目录不存在,请自己创建
cd /usr/local/spark/mycode/streaming/src/main/scala
vim NetworkWordCount.scala
上面用vim编辑器打开了NetworkWordCount.scala代码文件,可以看到该文件主要内容如下(这里只把代码内容保留,原来的注释都删除了):
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
。上面的代码,不能直接拿去sbt打包编辑,因为,里面有个 StreamingExamples.setStreamingLogLevels(),StreamingExamples来自另外一个代码文件,也就是“/usr/local/spark//spark-2.3.0-bin-hadoop2.7/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala”,请把这个StreamingExamples.scala文件也拷贝到自己的代码目录下(也就是和刚才的NetworkWordCount.scala放在同一个目录下):
下面要对代码进行sbt打包编译。这里需要一个simple.sbt文件,如果之前章节的学习已经在这个目录下建好了这个文件,这里就不需要再建,然后,就可以执行sbt打包编译了,命令如下:
cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package
打包成功以后,就可以输入以下命令启动这个程序(org.apache.spark.examples.streaming,这个代码中的包名):
cd /usr/local/spark/mycode/streaming
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口),这时,你就可以像刚才一样,新打开一个窗口作为nc窗口,输入句子,进行统计了。
下面我们再前进一步,这回,我们把数据源头的产生方式也修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源。
请在终端的Shell命令提示符下执行下面命令:
cd /usr/local/spark/mycode/streaming/src/main/scala
vim DataSourceSocket.scala
上面命令使用vim编辑器新建一个名称为DataSourceSocket.scala的代码文件,用来产生Socket数据源,请在该代码文件中输入下面代码:
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object DataSourceSocket {
def index(length: Int) = {
val rdm = new java.util.Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("Usage: <filename> <port> <millisecond>")
System.exit(1)
}
val fileName = args(0)
val lines = Source.fromFile(fileName).getLines.toList
val rowCount = lines.length
val listener = new ServerSocket(args(1).toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(2).toLong)
val content = lines(index(rowCount))
println(content)
out.write(content + '
')
out.flush()
}
socket.close()
}
}.start()
}
}
}
退出vim编辑器。上面这个程序的功能是,从一个文本文件中随机读取某行文本作为数据源,发送出去。
然后,就可以执行sbt打包编译了,命令如下:
cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package
注意,实际上,这个时候,我们的“/usr/local/spark/mycode/streaming/src/main/scala”目录下,就有了三个代码文件,分别是NetworkWordCount.scala、StreamingExamples.scala和DataSourceSocket.scala。sbt打包编译是同时对这三个代码文件打包编译。打包成功以后,就可以输入命令启动数据源程序和监听程序。
下面首先启动用来生成数据源的DataSourceSocket程序,不过,DataSourceSocket程序需要把一个文本文件作为输入参数,所以,在启动这个程序之前,需要首先创建一个文本文件:
cd /usr/local/spark/mycode/streaming/
vim word.txt
在word.txt中随便输入几行英文语句,然后保存并退出vim编辑器。
下面就启动DataSourceSocket程序,这个程序需要三个参数,第一个参数是文本文件路径,第二个参数是端口地址,第三个参数是时间间隔(单位是毫秒,也就是每隔多少毫秒发送一次信息),请执行下面命令启动这个程序:
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --class "DataSourceSocket" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar /usr/local/spark/mycode/streaming/word.txt 9999 1000
然后,你就会看到,这个窗口会不断打印出一些随机读取到的文本信息,这些信息也是Socket数据源,会被监听程序捕捉到。所以,下面,我们就在另外一个窗口启动监听程序:
cd /usr/local/spark/mycode/streaming
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
启动成功后,你就会看到,屏幕上不断打印出词频统计信息。成功完成实验。
RDD队列流操作
在调试Spark Streaming应用程序的时候,我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream。
下面是参考Spark官网的QueueStream程序设计的程序,每隔1秒创建一个RDD,Streaming每隔2秒就对数据进行处理。
请登录Linux系统,打开一个终端,进入Shell命令提示符状态,然后执行下面命令新建代码文件:
cd /usr/local/spark/mycode/streaming/src/main/scala //这个目录在前面章节操作中已经创建好了
vim TestRDDQueueStream.scala
上面用vim编辑器新建了一个TestRDDQueueStream.scala文件,请在该文件中输入以下代码:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object QueueStream {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(20))
val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print(1000)
ssc.start()
for (i <- 1 to 100){
rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
Thread.sleep(1000)
}
ssc.stop()
}
}
然后,我们用sbt进行打包编译。我们可以直接使用前面章节已经创建好的simple.sbt文件。
然后,就可以执行sbt打包编译了,命令如下:
cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package
运行程序:
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --class "QueueStream" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar
执行上面命令以后,程序就开始运行,就可以看到类似下面的结果:
Time: 1479522100000 ms
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
[1]: http://www.cnblogs.com/freebird92/p/8877462.html