zoukankan      html  css  js  c++  java
  • SparkStreaming简单例子(oldAPI)

    SparkStreaming简单例子

    ◆ 构建第一个Streaming程序: (wordCount) 

      ◆ Spark Streaming 程序最好以使用Maven或者sbt编译出来的独立应用的形式运行。

      ◆ 准备工作:
      1.引入Spark Streaming的jar
      2.scala流计算import声明
      import org.apache.spark.streaming.StreamingContext
      import org.apache.spark.streaming.StreamingContext._
      import org.apache.spark.streaming.dstream.DStream
      import org.apache.spark.streaming.Duration
      import org.apache.spark.streaming.Seconds

    1.初始化StreamingContext对象

       //创建一个本地StreamingContext两个工作线程和批间隔1秒。
       val conf = new SparkConf()
       conf.setMaster(“local[2]")
       conf.setAppName(“ NetworkWordCount")
       val ssc = new StreamingContext(conf, Seconds(1))

    2.获取DStream对象 

      //创建一个连接到主机名的DStream,像localhost:9999

       val lines = ssc.socketTextStream("localhost", 9999)

    3.操作DStream对象

      //将每一行接收到的数据通过空格分割成单词 

      val words = lines.flatMap(_.split(" “))
      //导入StreamingContext中的隐式转换
      import org.apache.spark.streaming.StreamingContext._

       // 对每一批次的单词进行转化求和

      val pairs = words.map(word => (word, 1))
      val wordCounts = pairs.reduceByKey(_ + _)
      // 每个批次中默认打印前十个元素到控制台
      wordCounts.print()

    4.启动流处理程序

      ssc.start// 开始计算

      ssc.awaitTermination() // 等待计算终止

      ssc.stop() //结束应用

    启动网络端口,模拟发送数据

      1.借助于nc命令,手动输入数据

        Linux/Mac :nc

        Windows:cat

          nc -lk 9999

      2.借助于代码,编写一个模拟数据发生器  

    package com.briup.streaming
    
    import java.io.PrintWriter
    import java.net.ServerSocket
    
    import scala.io.Source
    
    object MassageServer {
    
      // 定义随机获取整数的方法
      def index(length: Int) = {
        import java.util.Random
        val rdm = new Random
        rdm.nextInt(length)
      }
    
      def main(args: Array[String]) {
        println("模拟数据器启动!!!")
        // 获取指定文件总的行数
        val filename ="Spark/ihaveadream.txt";
        val lines = Source.fromFile(filename).getLines.toList
        val filerow = lines.length
    
        // 指定监听某端口,当外部程序请求时建立连接
        val serversocket = new ServerSocket(9999);
    
        while (true) {
          //监听9999端口,获取socket对象
          val socket = serversocket.accept()
          //      println(socket)
          new Thread() {
            override def run = {
              println("Got client connected from: " + socket.getInetAddress)
    
              val out = new PrintWriter(socket.getOutputStream(), true)
    
              while (true) {
                Thread.sleep(1000)
                // 当该端口接受请求时,随机获取某行数据发送给对方
                val content = lines(index(filerow))
    
                println (content)
    
                out.write(content + '
    ')
    
                out.flush()
              }
              socket.close()
            }
          }.start()
        }
      }
    }
    模拟发送数据

    注意事项:

    ◆ 1.启动 Spark Streaming 之前所作的所有步骤只是创建了执行流程, 程序没有真正
    连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划
    ◆ 2.当 ssc.start()启动后程序才真正进行所有预期的操作
    ◆ 3.执行会在另一个线程中进行,所以需要调用awaitTermination来等待流计算完成
    ◆ 4.一个Streaming context只能启动一次
    ◆ 5.如果模式是本地模式,那么请务必设置local[n] ,n>=2   1个用于接收,1个用于处理

    
    
    package com.briup.streaming
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Duration, StreamingContext}
    
    object MyTestOldAPI {
      def main(args: Array[String]): Unit = {
        //设置日志级别
        Logger.getLogger("org").setLevel(Level.WARN)
    
        //1 获取DS
        val conf = new SparkConf().setAppName("MyTestOldAPI").setMaster("local[*]")
        val dss = new StreamingContext(conf, Duration(1000))
        val ds = dss.socketTextStream("localhost", 9999)
    
        //2 逻辑处理  //统计
        val res = ds.filter(_ != "").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    
        res.print()
    
        //3 开启实时处理任务
        dss.start()
        dss.awaitTermination()
        dss.stop()
      }
    }
  • 相关阅读:
    linux运维、架构之路-MySQL备份与恢复(四)
    linux运维、架构之路-MHA高可用方案
    Tensorflow环境安装
    Sublime Text3 旧版本下载以及破解激活方式
    《SDN期末作业——实现负载均衡》
    SDN第六次上机作业
    自定义Toast的出现样式
    SDN第五次上机作业
    SDN第4次上机作业
    Context与ApplicationContext的区别
  • 原文地址:https://www.cnblogs.com/Diyo/p/11392059.html
Copyright © 2011-2022 走看看