zoukankan      html  css  js  c++  java
  • 九、sparkStream的scala示例

    简介

    sparkStream官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#overview

    sparkStream是构建在spark core之上的实时流处理框架,它支持很多的数据源,如:

    你可以从kafka等各种数据源中实时获取数据流,然后经过spark计算,持久化或者实时的dashBoard展示。

    sparkStream的实时计算其实也可以称为微批处理计算,它将数据流按照一定的时间段分割成小批的数据,然后将对数据流的操作转换为对RDD的操作,整个流计算的中间结果进行叠加存储到内存或者外部设备,如图:

    代码示例

    下面将使用tcp socket作为数据源,每隔1秒钟发送字符数据。sparkstream将在启动以后,将收集10秒的数据作为一个批数据进行统计处理,代码如下:

    import java.net.ServerSocket
    
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @Description sparkStream demo
      * @Author lay
      * @Date 2018/12/08 21:43
      */
    object SparkStreamDemo {
      var conf: SparkConf = _
      var sc: SparkContext = _
      var ssc: StreamingContext = _
    
      def init(): Unit = {
        conf = new SparkConf().setAppName("spark stream demo").setMaster("local[2]")
        sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        // 时间片为10秒钟
        ssc = new StreamingContext(sc, Seconds(10))
      }
    
      def main(args: Array[String]): Unit = {
        // 初始化socket流
        initSocketStream()
        // 初始化SparkStream
        init()
        // 从socket获取DStream
        val lines = ssc.socketTextStream("localhost", 8888)
        // 统计字数
        val wordCount = lines.flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_)
        // 打印结果
        wordCount.print()
        // 启动
        ssc.start()
        println("spark stream started")
      }
    
      def initSocketStream(): Unit = {
        new Thread(new Runnable {
          override def run(): Unit = {
            val serverSocket = new ServerSocket(8888)
            val socket = serverSocket.accept()
            println("accepted")
            for (i <- 1 to 10) {
              val text = "what is this
    "
              socket.getOutputStream.write(text.getBytes("utf-8"))
              Thread.sleep(1000)
            }
            println("waiting")
            Thread.sleep(50000)
            socket.close()
            serverSocket.close()
            println("closed")
          }
        }).start()
        println("thread started")
      }
    }

    注意:

    1)这里的master设置为"local[2]",是因为spark起码需要两个线程,一个线程用来接收数据,另一个线程用来处理数据。

     2)"what is this "这里加了一个' '字符,是因为字节流的接收将会以这个字符作为分隔符。

    你会看到类似如下的打印:

    -------------------------------------------
    Time: 1544281700000 ms
    -------------------------------------------
    (this,10)
    (is,10)
    (what,10)
  • 相关阅读:
    Jedis测试redis
    jedis池的作用
    错误
    Ceph剖析:数据分布之CRUSH算法与一致性Hash
    drools规则引擎初探
    Techniques for HA IT Management
    django_simple_captcha使用笔记
    微服务架构的理论基础
    分布式系统服务的稳定性
    四层、七层负载均衡的区别
  • 原文地址:https://www.cnblogs.com/lay2017/p/10089614.html
Copyright © 2011-2022 走看看