zoukankan      html  css  js  c++  java
  • spark 系列之六 SparkStreaming数据源之socket流

    SparkStreaming 这个名字起的很有意思,就是只要能流式读取的数据,都可以作为SparkStreaming的数据源

    下面我们来介绍另一种常见的流,socket流(套接字流)

    socket个人理解就像是一部手机(根据时代的不同,之前可以理解成有线电话),通过这部手机,可以进行信息流的传递。

    在打电话之前,我们是不是得先有一部电话,然后想给别人打电话的时候,是不是我们得先拨号,然后等待对方接通(就是阻塞的过程),再通话(send和receive的过程)?

    对于socket不太了解的可以去自行Google,这里不做详细解读。参考如下链接(https://www.jianshu.com/p/007adba06047)

    上两张图震一下:

    图一:如上图,在七个层级关系中,我们将的socket属于传输层,其中UDP是一种面向无连接的传输层协议。

     图二:如上图,Socket的通信方式

    切入正题:

    首先初始化一个ServerSocket,然后对指定的端口进行绑定,接着对端口及进行监听,通过调用accept方法阻塞。

    import java.io.{PrintWriter}
    import java.net.ServerSocket
    import scala.io.Source
    
    object DataSourceSocket {
      def index(length: Int) = {
        val rdm = new java.util.Random
        rdm.nextInt(length)
      }
      def main(args: Array[String]) {
        /**
         * 第一个参数是文本文件路径,第二个参数是端口地址,第三个参数是时间间隔(单位是毫秒,也就是每隔多少毫秒发送一次信息)
         */
        val params = Array("D:/software_download/spark_text/streaming/logfile/score.txt","9999", "1000")
        val fileName = params(0)
        val lines = Source.fromFile(fileName).getLines.toList
        println(lines)
        val rowCount = lines.length
    
        val listener = new ServerSocket(params(1).toInt)
    while (true) { //调用accept方法,等待客户端接通 val socket = listener.accept() new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(params(2).toLong) val content = lines(index(rowCount)) println(content) out.write(content + ' ') out.flush() } socket.close() } }.start() } } }

    运行结果如下,会阻塞等待客户端:

    其次,使用客户端接通socket服务端,并且进行通信,不过该实验的通信方式是单向的,即客户端只接受数据,而不发送数据。

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.storage.StorageLevel
    
    object SparkStreaming_socket {
    
      def main(args: Array[String]): Unit = {
        /**
         * 监听本地的9999端口
         */
        val params = Array("localhost","9999")
    
        val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
        val lines = ssc.socketTextStream(params(0), params(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    
    }

    该服务一旦启动,服务端会持续的发送数据如下:

     而客户端,则根据发送过来的数据,进行持续的计算操作如下:

     以上:)

  • 相关阅读:
    C++字符串函数之append()、insert()
    492. Construct the Rectangle(LeetCode)
    桶排序
    104. Maximum Depth of Binary Tree (LeetCode)
    557. Reverse Words in a String III(LeetCode )
    基数排序(LSD)
    500. Keyboard Row
    输入两个单调递增的链表,输出两个链表合成后的链表,当然我们需要合成后的链表满足单调不减规则。
    myeclipse打断点进入后无法查看变量的值的解决方法
    可参考的js代码
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14217932.html
Copyright © 2011-2022 走看看