zoukankan      html  css  js  c++  java
  • 九、sparkStream的scala示例

    简介

    sparkStream官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#overview

    sparkStream是构建在spark core之上的实时流处理框架,它支持很多的数据源,如:

    你可以从kafka等各种数据源中实时获取数据流,然后经过spark计算,持久化或者实时的dashBoard展示。

    sparkStream的实时计算其实也可以称为微批处理计算,它将数据流按照一定的时间段分割成小批的数据,然后将对数据流的操作转换为对RDD的操作,整个流计算的中间结果进行叠加存储到内存或者外部设备,如图:

    代码示例

    下面将使用tcp socket作为数据源,每隔1秒钟发送字符数据。sparkstream将在启动以后,将收集10秒的数据作为一个批数据进行统计处理,代码如下:

    import java.net.ServerSocket
    
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @Description sparkStream demo
      * @Author lay
      * @Date 2018/12/08 21:43
      */
    object SparkStreamDemo {
      var conf: SparkConf = _
      var sc: SparkContext = _
      var ssc: StreamingContext = _
    
      def init(): Unit = {
        conf = new SparkConf().setAppName("spark stream demo").setMaster("local[2]")
        sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        // 时间片为10秒钟
        ssc = new StreamingContext(sc, Seconds(10))
      }
    
      def main(args: Array[String]): Unit = {
        // 初始化socket流
        initSocketStream()
        // 初始化SparkStream
        init()
        // 从socket获取DStream
        val lines = ssc.socketTextStream("localhost", 8888)
        // 统计字数
        val wordCount = lines.flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_)
        // 打印结果
        wordCount.print()
        // 启动
        ssc.start()
        println("spark stream started")
      }
    
      def initSocketStream(): Unit = {
        new Thread(new Runnable {
          override def run(): Unit = {
            val serverSocket = new ServerSocket(8888)
            val socket = serverSocket.accept()
            println("accepted")
            for (i <- 1 to 10) {
              val text = "what is this
    "
              socket.getOutputStream.write(text.getBytes("utf-8"))
              Thread.sleep(1000)
            }
            println("waiting")
            Thread.sleep(50000)
            socket.close()
            serverSocket.close()
            println("closed")
          }
        }).start()
        println("thread started")
      }
    }

    注意:

    1)这里的master设置为"local[2]",是因为spark起码需要两个线程,一个线程用来接收数据,另一个线程用来处理数据。

     2)"what is this "这里加了一个' '字符,是因为字节流的接收将会以这个字符作为分隔符。

    你会看到类似如下的打印:

    -------------------------------------------
    Time: 1544281700000 ms
    -------------------------------------------
    (this,10)
    (is,10)
    (what,10)
  • 相关阅读:
    700. Search in a Binary Search Tree
    100. Same Tree
    543. Diameter of Binary Tree
    257. Binary Tree Paths
    572. Subtree of Another Tree
    226. Invert Binary Tree
    104. Maximum Depth of Binary Tree
    1、解决sublime打开文档,出现中文乱码问题
    移植seetafaceengine-master、opencv到ARM板
    ubuntu16.04-交叉编译-SeetaFaceEngine-master
  • 原文地址:https://www.cnblogs.com/lay2017/p/10089614.html
Copyright © 2011-2022 走看看