zoukankan      html  css  js  c++  java
  • 7.7 输出操作

    在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。

    一、把DStream输出到文本文件中

    请在NetworkWordCountStateful.scala代码文件中输入以下内容:

    package org.apache.spark.examples.streaming
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.storage.StorageLevel
    object NetworkWordCountStateful {
      def main(args: Array[String]) {
        //定义状态更新函数
        val updateFunc = (values: Seq[Int], state: Option[Int]) => {
          val currentCount = values.foldLeft(0)(_ + _)
          val previousCount = state.getOrElse(0)
          Some(currentCount + previousCount)
        }
            StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别    
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
        val sc = new StreamingContext(conf, Seconds(5))
        sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
        val lines = sc.socketTextStream("localhost", 9999)
        val words = lines.flatMap(_.split(" "))
        val wordDstream = words.map(x => (x, 1))
        val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
        stateDstream.print()
            //下面是新增的语句,把DStream保存到文本文件中
            stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt")
        sc.start()
        sc.awaitTermination()
      }
    }

    sbt打包编译后,使用如下命令运行程序:

    程序运行以后,屏幕上就会显示类似下面的程序运行信息:

     打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:

    运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:

     这些词频结果被成功地输出到“/usr/local/spark/mycode/streaming/dstreamoutput/output.txt”文件中

     可以发现,在这个目录下,生成了很多文本文件,如下:

     output.txt的命名看起来像一个文件,但是,实际上,spark会生成名称为output.txt的目录,而不是文件。

    二、把DStream写入到MySQL数据库中

    启动MySQL数据库,并完成数据库和表的创建:

    在此前已经创建好的“spark”数据库中创建一个名称为“wordcount”的表:

    在NetworkWordCountStateful.scala文件中加入下面代码:

    把partition里面的每条记录用func函数写到底层的MySQL数据库当中去

     完整代码:

    package org.apache.spark.examples.streaming
    import java.sql.{PreparedStatement, Connection, DriverManager}
    import java.util.concurrent.atomic.AtomicInteger
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.storage.StorageLevel
    
    object NetworkWordCountStateful {
      def main(args: Array[String]) {
        //定义状态更新函数
        val updateFunc = (values: Seq[Int], state: Option[Int]) => {
          val currentCount = values.foldLeft(0)(_ + _)
          val previousCount = state.getOrElse(0)
          Some(currentCount + previousCount)
        }
        StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
        
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
        val sc = new StreamingContext(conf, Seconds(5))
        sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
        val lines = sc.socketTextStream("localhost", 9999)
        val words = lines.flatMap(_.split(" "))
        val wordDstream = words.map(x => (x, 1))
        val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
        stateDstream.print()
    
    //下面是新增的语句,把DStream保存到MySQL数据库中     
         stateDstream.foreachRDD(rdd => {
          //内部函数
          def func(records: Iterator[(String,Int)]) {
            var conn: Connection = null
            var stmt: PreparedStatement = null
            try {
              val url = "jdbc:mysql://localhost:3306/spark"
              val user = "root"
              val password = "hadoop"  //数据库密码是hadoop
              conn = DriverManager.getConnection(url, user, password)
              records.foreach(p => {
                val sql = "insert into wordcount(word,count) values (?,?)"
                stmt = conn.prepareStatement(sql);
                stmt.setString(1, p._1.trim) # 第一个字段,去掉空格的字符串
                stmt.setInt(2,p._2.toInt) # 第二个字段
                stmt.executeUpdate()
              })
           } catch {
              case e: Exception => e.printStackTrace()
            } finally {
              if (stmt != null) {
                stmt.close()
              }
              if (conn != null) {
                conn.close()
              }
            }
          }
          val repartitionedRDD = rdd.repartition(3)
          repartitionedRDD.foreachPartition(func)
        })
        sc.start()
        sc.awaitTermination()
      }
    }
    

      

  • 相关阅读:
    (hash) leetcode 49. Group Anagrams
    (Manacher Algorithm, 中心拓展法,动态规划) leetcode 5. 最长回文串
    (KMP) leetcode 28. Implement strStr() 字符串匹配
    (字符串,哈希) leetcode 8. atoi,12. Integer to Roman
    (贪心) leetcode 435 Non-overlapping Intervals, 455 design cookies
    (Divide and Conquer) Closest Pair of Pairs
    (栈,双端队列,双指针) leetcode. 844 Backspace String Compare
    write、writelines
    *、**
    inf
  • 原文地址:https://www.cnblogs.com/nxf-rabbit75/p/12040941.html
Copyright © 2011-2022 走看看