zoukankan      html  css  js  c++  java
  • spark_streaming_微批量处理

    数据处理方式有两种

    1. 有界数据 --> 批量处理
    2. 无界数据 --> 流式处理 

    数据按照时间处理方式两种

    1. 离线计算
    2. 实时计算

    spark streaming 微处理数据方式

    1. 获取元数据,先receiver(并短暂存储) 在进行计算
    2. 获取元数据,放入到队列中(存储) ,直接direct进行计算

    服务器方式模仿元数据

    • ip: 192.168.75.91
    • nc -l 8080   
    • 输入 spark world , hello world
      def main(args: Array[String]): Unit = {
    
        // local[5]写5是因为1个线程用来接收、一个线程用来计算
        // 1个给receiverJob的task. 2.另一个给batch计算的job
        val conf = new SparkConf().setAppName("receiverJob").setMaster("local[5]")
    
    
        // 微批的流式计算,时间去定义批次,(while --> 时间间隔触发job)
        val streamingContext = new StreamingContext(conf, Seconds(5))
        streamingContext.sparkContext.setLogLevel("ERROR")
    
        // hello world
        val dataDStream = streamingContext.socketTextStream("192.168.75.91", 6379)
        // hello
        // world
        val words = dataDStream.flatMap(_.split(" ")) //转换算子
        val wordsCount = words.map((_, 1)).reduceByKey(_+_)  // 转换算子
    
        wordsCount.print() // 输出算子
    
        streamingContext.start() //执行算子
    
        // 主线程不退出
        streamingContext.awaitTermination()
    
      }

    结果:

    -------------------------------------------
    Time: 1615907230000 ms
    -------------------------------------------

    spark 1
    world 2
    hello 1

    -------------------------------------------
    Time: 1615907235000 ms
    -------------------------------------------

     

    本地模仿数据

    // 线程制造数据
    def main(args: Array[String]): Unit
    = { // 本地模仿制造数据 val listen = new ServerSocket(8889) println(" server start") while (true) { val client:Socket = listen.accept() new Thread() { override def run(): Unit = { var num = 0 if (client.isConnected) { val outputStream = client.getOutputStream val printer = new PrintStream(outputStream) while (client.isConnected) { num += 1 printer.println(s"hello ${num}") } } } }.start() } }
    // streaming接收数据直接打印结果
    
      def main(args: Array[String]): Unit = {
    
        // local[5]写5是因为1个线程用来接收、一个线程用来计算
        // 1个给receiverJob的task. 2.另一个给batch计算的job
        val conf = new SparkConf().setAppName("receiverJob").setMaster("local[9]")
        
        // 微批的流式计算,时间去定义批次,(while --> 时间间隔触发job)
        val streamingContext = new StreamingContext(conf, Seconds(5))
        streamingContext.sparkContext.setLogLevel("ERROR")
    
        val dataDStream = streamingContext.socketTextStream("localhost", 8889)
        dataDStream.print()
    
        streamingContext.start() //执行算子
    
        // 主线程不退出
        streamingContext.awaitTermination()
      }
    
    
    结果:
    Time: 1615908955000 ms
    -------------------------------------------
    hello 95668
    hello 95669
    hello 95670
    hello 95671
    hello 95672
    hello 95673
    hello 95674
    hello 95675
    hello 95676
    hello 95677
    ...
    
    -------------------------------------------
    Time: 1615908960000 ms
    -------------------------------------------
    hello 241620
    hello 241621
    hello 241622
    hello 241623
    hello 241624
    hello 241625
    hello 241626
    hello 241627
    hello 241628
    hello 241629
    // 制作数据增加线程阻塞
                while (client.isConnected) {
                  num += 1
                  printer.println(s"hello ${num}")
                  Thread.sleep(1000)
                }
    
    
    
    打印结果:
    -------------------------------------------
    Time: 1615909175000 ms
    -------------------------------------------
    hello 2
    hello 3
    hello 4
    hello 5
    hello 6
    
    -------------------------------------------
    Time: 1615909180000 ms
    -------------------------------------------
    hello 7
    hello 8
    hello 9
    hello 10
    hello 11

    阻塞接收数据为20秒

      def main(args: Array[String]): Unit = {
    
        // local[5]写5是因为1个线程用来接收、一个线程用来计算
        // 1个给receiverJob的task. 2.另一个给batch计算的job
        val conf = new SparkConf().setAppName("receiverJob").setMaster("local[9]")
    
        // 微批的流式计算,时间去定义批次,(while --> 时间间隔触发job)
        val streamingContext = new StreamingContext(conf, Seconds(5))
        streamingContext.sparkContext.setLogLevel("ERROR")
    
        val dataDStream = streamingContext.socketTextStream("localhost", 8889)
    
        val res: DStream[(String, String)] = dataDStream.map(_.split(" ")).map(vars=>{
          Thread.sleep(20000)
          (vars(0),vars(1))
        })
        res.print()
    
        streamingContext.start() //执行算子
    
        // 主线程不退出
        streamingContext.awaitTermination()
      }

    stage

     straming中可以看到很多的队列,说明任务处理是阻塞模式,虽然有启动9个线程,一个接收数据,还有8个,只有其中一个在处理任务,其他排队阻塞,并没有并行处理

    改进接收数据方式

      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("streaming").setMaster("local[9]")
        val streamingContext = new StreamingContext(conf, Seconds(5))
        streamingContext.sparkContext.setLogLevel("ERROR")
        val dstream: ReceiverInputDStream[String] = streamingContext.receiverStream(new CustormReceiver("localhost", 8889))
        dstream.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    
    
    创建:CustormReceiver
    
    class CustormReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.DISK_ONLY) {
      override def onStart(): Unit = {
    
        new Thread {
          override def run(): Unit = {
            ooxx()
          }
        }.start()
      }
    
      private def ooxx(): Unit = {
        val server = new Socket(host, port)
        val reader = new BufferedReader(new InputStreamReader(server.getInputStream))
        var line = reader.readLine()
        while (!isStopped() && line != null) {
          store(line)
          line = reader.readLine()
        }
    
      }
      override def onStop(): Unit = ???
    }

    结果:

    -------------------------------------------
    Time: 1615910635000 ms
    -------------------------------------------
    hello 2
    hello 3
    hello 4
    hello 5
    hello 6

    
    

    -------------------------------------------
    Time: 1615910640000 ms
    -------------------------------------------
    hello 7
    hello 8
    hello 9
    hello 10
    hello 11

    
    
    socketTextStream和receiverStream区别
    • socketTextStream排队执行,每次只有一个任务在处理,其他的在队列中等待
    • receiverStream 任务并列执行
  • 相关阅读:
    10-2[RF] OOB validation
    5.css背景以及书写位置
    4.css基础
    3.表单form
    2.表格
    1.html基础
    正则表达式
    协程
    7.树与树算法
    6.排序与二分查找
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14546711.html
Copyright © 2011-2022 走看看