zoukankan      html  css  js  c++  java
  • 编程模型:数据输出层

    结果保存到HDFS中

    import org.apache.hadoop.io.{NullWritable, Text}
    import org.apache.hadoop.mapred.{SequenceFileOutputFormat, TextOutputFormat}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
      *
      * 1、在master服务器上启动一个Netcat server
      * `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
      *
      * 2、用下面的命令在在集群中将Spark Streaming应用跑起来
      * spark-submit --class com.twq.streaming.output.NetworkWordCountHDFS 
      * --master spark://master:7077 
      * --deploy-mode client 
      * --driver-memory 512m 
      * --executor-memory 512m 
      * --total-executor-cores 4 
      * --executor-cores 2 
      * /home/hadoop-twq/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar
      */
    object NetworkWordCountHDFS {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("NetworkWordCountHDFS")
        val sc = new SparkContext(sparkConf)
    
        // Create the context with a 1 second batch size
        val ssc = new StreamingContext(sc, Seconds(5))
    
        //创建一个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
        val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
    
        //处理的逻辑,就是简单的进行word count
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    
        //将结果输出
        ///Option[Int])   改称一个分区
        wordCounts.repartition(1).mapPartitions { iter =>
          val text = new Text()
          iter.map { x =>
            text.set(x.toString)
            (NullWritable.get(), text)
          }
        } saveAsHadoopFiles[TextOutputFormat[NullWritable, Text]](
          "hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/hadoop/wordcount", "-hadoop")
    
        wordCounts.repartition(1).map(x => {
          val text = new Text()
          text.set(x.toString())
          (NullWritable.get(), text)
        }).saveAsHadoopFiles[SequenceFileOutputFormat[NullWritable, Text]](
          "hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/sequence/wordcount", "-hadoop")//  文件名字(非目录名字),以—hadoop为后缀,中间会有个时间戳
    
        wordCounts.repartition(1).saveAsTextFiles("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/text/wordcount")   ////saveAsTextFiles 简介方法
    
        wordCounts.repartition(1).saveAsObjectFiles("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/object/wordcount")
    
        //启动Streaming处理流
        ssc.start()
    
        ssc.stop(false)
    
        //结果验证
        sc.textFile("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/hadoop/wordcount*")
        //  每隔一段时间生成一定的小文件,coalesce(1) 将所有的小文件合并成一个大文件  saveAsTextFile(" 文件目录")  保存到一定的目录中
        sc.textFile("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/hadoop/wordcount*").coalesce(1).saveAsTextFile("")
    
        sc.sequenceFile("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/sequence/wordcount*",
          classOf[NullWritable], classOf[Text]).map(_._2.toString).collect()
    
        sc.objectFile[(String, Int)]("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/object/wordcount*").collect
    
        sc.textFile("hdfs://master:9999/user/hadoop-twq/spark-course/streaming/data/text/wordcount*").collect
    
        //等待Streaming程序终止
        ssc.awaitTermination()
      }
    }
    

      将数据保存到MySQL中

    import java.sql.DriverManager
    
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
      *
      * 1、在master服务器上启动一个Netcat server
      * `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
      *
      *
      * create table wordcount(ts bigint, word varchar(50), count int);
      *
      * spark-shell --total-executor-cores 4 --executor-cores 2 --master spark://master:7077 --jars mysql-connector-java-5.1.44-bin.jar,
      * c3p0-0.9.1.2.jar,spark-streaming-basic-1.0-SNAPSHOT.jar
      *
      *
      */
    object NetworkWordCountForeachRDD {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("NetworkWordCountForeachRDD")
        val sc = new SparkContext(sparkConf)
    
        // Create the context with a 1 second batch size
        val ssc = new StreamingContext(sc, Seconds(5))
    
        //创建一个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
        val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
    
        //处理的逻辑,就是简单的进行word count
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    
        //将结果保存到Mysql(错误代码:Connection不能序列化  Driver端执行的代码不能在execute上 )
        wordCounts.foreachRDD { (rdd, time) =>
          Class.forName("com.mysql.jdbc.Driver")
          val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "root")
          val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
          rdd.foreach { record =>
            statement.setLong(1, time.milliseconds)
            statement.setString(2, record._1)
            statement.setInt(3, record._2)
            statement.execute()
          }
          statement.close()
          conn.close()
        }
        //启动Streaming处理流
        ssc.start()
    
        ssc.stop(false)
    
    
    
        //将结果保存到Mysql(优化代码)
        wordCounts.foreachRDD { (rdd, time) =>
          rdd.foreachPartition { partitionRecords =>
            val conn = ConnectionPool.getConnection
            conn.setAutoCommit(false)
            val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
            partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
              statement.setLong(1, time.milliseconds)
              statement.setString(2, word)
              statement.setInt(3, count)
              statement.addBatch()
              if (index != 0 && index % 500 == 0) {
                statement.executeBatch()
                conn.commit()
              }
            }
            statement.executeBatch()
            statement.close()
            conn.commit()
            conn.setAutoCommit(true)
            ConnectionPool.returnConnection(conn)
          }
        }
    
        //等待Streaming程序终止
        ssc.awaitTermination()
      }
    }
    

      

  • 相关阅读:
    树莓派4B
    SpringBoot 自定义 info Actuator 原理
    RestTemplate翻译serviceId过程
    ISA, ABI, API区别
    01.编译器结构
    【Git123】Git SSH Key支持多账号
    【消息中间件123】Solace PubSub+ Event Broker介绍
    【ETL123】
    【Http123】Http Timeout
    【性能123】Linux性能之“平均负载”
  • 原文地址:https://www.cnblogs.com/tesla-turing/p/11488198.html
Copyright © 2011-2022 走看看