zoukankan      html  css  js  c++  java
  • Spark Streaming(一)

    微批处理
    伪实时处理

    数据源

    1.非自定义数据源

     val conf: SparkConf = new SparkConf().setAppName("111").setMaster("local[*]")
    
        val context = new StreamingContext(conf, Seconds(3))
    
        //从指定端口监听数据
        val line = context.socketTextStream("xxx.xxx.xxx.xxx", 9999)
    
        //从指定文件夹采集数据,比较鸡肋,没有flume好用
    //    val file = context.textFileStream("path")
        //数据处理
        val value = line.flatMap(line => line.split(" ")).map((_, 1)).reduceByKey((_ + _))
        value.print()
    
        //打开监听器
        context.start()
        //Driver等待监听器停止
        context.awaitTermination()
    

    2.自定义数据源

    • 自定义收集器,集成Receiver
    class MyReceiver(hostname:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
        var socket :java.net.Socket= null
    
        def receive:Unit={
            socket =new java.net.Socket(hostname, port)
          val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, "UTF-8"))
    
          var line:String = null
          while((line = reader.readLine())!=null){
            if("END".equals(line)){
              return
            }else{
              this.store(line)
            }
    
    
          }
        }
        override def onStart(): Unit = {
          new Thread(()=>receive).start()
        }
    
        override def onStop(): Unit = {
          if(socket != null){
            socket.close()
            socket = null
          }
        }
      }
    
    • 使用自定义收集器
    val line = context.receiverStream(new MyReceiver("xxx.xxx.xxx.xxx", 9999))
    
  • 相关阅读:
    pkuwc2019自闭记
    一些多项式的整理
    Codeforces 1085G(1086E) Beautiful Matrix $dp$+树状数组
    Codeforces 1083C Max Mex 线段树
    Codeforces 1090J $kmp+hash+$二分
    Codeforces 1073G Yet Another LCP Problem $SA$+单调栈
    Codedforces 1076G Array Game 线段树
    LuoguP5017 摆渡车 $dp$
    NOIP2018游记
    解决python3 安装turtle的问题
  • 原文地址:https://www.cnblogs.com/zqzhen/p/12871853.html
Copyright © 2011-2022 走看看