SparkStreaming 这个名字起的很有意思,就是只要能流式读取的数据,都可以作为SparkStreaming的数据源
下面我们来介绍另一种常见的流,socket流(套接字流)
socket个人理解就像是一部手机(根据时代的不同,之前可以理解成有线电话),通过这部手机,可以进行信息流的传递。
在打电话之前,我们是不是得先有一部电话,然后想给别人打电话的时候,是不是我们得先拨号,然后等待对方接通(就是阻塞的过程),再通话(send和receive的过程)?
对于socket不太了解的可以去自行Google,这里不做详细解读。参考如下链接(https://www.jianshu.com/p/007adba06047)
上两张图震一下:
图一:如上图,在七个层级关系中,我们将的socket属于传输层,其中UDP是一种面向无连接的传输层协议。
图二:如上图,Socket的通信方式
切入正题:
首先初始化一个ServerSocket,然后对指定的端口进行绑定,接着对端口及进行监听,通过调用accept方法阻塞。
import java.io.{PrintWriter} import java.net.ServerSocket import scala.io.Source object DataSourceSocket { def index(length: Int) = { val rdm = new java.util.Random rdm.nextInt(length) } def main(args: Array[String]) { /** * 第一个参数是文本文件路径,第二个参数是端口地址,第三个参数是时间间隔(单位是毫秒,也就是每隔多少毫秒发送一次信息) */ val params = Array("D:/software_download/spark_text/streaming/logfile/score.txt","9999", "1000") val fileName = params(0) val lines = Source.fromFile(fileName).getLines.toList println(lines) val rowCount = lines.length val listener = new ServerSocket(params(1).toInt)
while (true) { //调用accept方法,等待客户端接通 val socket = listener.accept() new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(params(2).toLong) val content = lines(index(rowCount)) println(content) out.write(content + ' ') out.flush() } socket.close() } }.start() } } }
运行结果如下,会阻塞等待客户端:
其次,使用客户端接通socket服务端,并且进行通信,不过该实验的通信方式是单向的,即客户端只接受数据,而不发送数据。
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel object SparkStreaming_socket { def main(args: Array[String]): Unit = { /** * 监听本地的9999端口 */ val params = Array("localhost","9999") val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(1)) val lines = ssc.socketTextStream(params(0), params(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
该服务一旦启动,服务端会持续的发送数据如下:
而客户端,则根据发送过来的数据,进行持续的计算操作如下:
以上:)