zoukankan      html  css  js  c++  java
  • sparkStraming存储数据到mysql

    package sparkStreaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.HashPartitioner
    import org.apache.spark.streaming.Duration
    import org.apache.spark.sql.SQLContext 
    import org.apache.spark.{SparkContext, SparkConf} 
    import spark.bean.orders
    import java.util.Properties 
    import java.sql.{DriverManager, PreparedStatement, Connection}  
    import org.apache.spark.{SparkContext, SparkConf}  
    
    object WebPagePopularityValueCalculator {
    
      def main(args: Array[String]) {
    
        val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
    
        val topicpMap = topics.split(",").map((_, 2)).toMap
        
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val popularityData = lines.map { msgLine =>
          {
            val dataArr: Array[String] = msgLine.split("\|")
            val pageID = dataArr(0)
            val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
            (pageID, popValue)
          }
        }
        //sum the previous popularity value and current value
        val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
          iterator.flatMap(t => {
            val newValue: Double = t._2.sum
            val stateValue: Double = t._3.getOrElse(0);
            Some(newValue + stateValue)
          }.map(sumedValue => (t._1, sumedValue)))
        }
        val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
        val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
          new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
        //set the checkpoint interval to avoid too frequently data checkpoint which may
        //may significantly reduce operation throughput
        stateDstream.checkpoint(Duration(8 * 2 * 1000))
        //after calculation, we need to sort the result and only show the top 10 hot pages
        stateDstream.foreachRDD { rdd =>
          {
            val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)
            val topKData = sortedData.take(10).map { case (v, k) => (k, v) }
            topKData.foreach{ case (k, v) => 
              if(v != 0) {
                 println("page" + k + "  " + "value" + v)
                 val itb = Iterator((k, v))
                 toMySql(itb)
              }
              
            }
            
          }
        }
        ssc.start()
        ssc.awaitTermination()
      }
      
      def toMySql(iterator: Iterator[(String, Double)]): Unit = {
            var conn: Connection = null  
            var ps: PreparedStatement = null  
            val sql = "insert into userbehavior(page, number) values (?, ?)"  
            try {  
                Class.forName("com.mysql.jdbc.Driver");  
                conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123")  
                iterator.foreach(dataIn => {  
                    ps = conn.prepareStatement(sql)  
                    ps.setString(1, dataIn._1)  
                    ps.setDouble(2, dataIn._2)  
                    ps.executeUpdate()  
                }  
                )  
            } catch {  
                case e: Exception => e.printStackTrace()  
            } finally {  
                if (ps != null) {  
                    ps.close()  
            }  
                if (conn != null) {  
                    conn.close()  
                }  
            }  
      }
      
    }
    
    
    
      
    

     重复存储问题修复

    package sparkStreaming
    
    import java.util.Properties
    import kafka.producer._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    import java.sql.{ DriverManager, PreparedStatement, Connection }
    import org.apache.spark.{ SparkContext, SparkConf }
    
    object WebPagePopularityValueCalculator1 {
      def main(args: Array[String]) {
        val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
    
        val topicpMap = topics.split(",").map((_, 2)).toMap
    
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val popularityData = lines.map { msgLine =>
          {
            val dataArr: Array[String] = msgLine.split("\|")
            val pageID = dataArr(0)
            val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
            val itb = Iterator((pageID, popValue))
            toMySql(itb)
            (pageID, popValue)
          }
        }
        popularityData.print()
        ssc.start()
        ssc.awaitTermination()
    
      }
    
      def toMySql(iterator: Iterator[(String, Double)]): Unit = {
        var conn: Connection = null
        var ps: PreparedStatement = null
        val sql = "insert into userbehavior(page, number) values (?, ?)"
        try {
          Class.forName("com.mysql.jdbc.Driver");
          conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123")
          iterator.foreach(dataIn => {
            ps = conn.prepareStatement(sql)
            ps.setString(1, dataIn._1)
            ps.setDouble(2, dataIn._2)
            ps.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (ps != null) {
            ps.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
    
    }
    
    package sparkStreaming
    
    import java.util.Properties
    import kafka.producer._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    import java.sql.{ DriverManager, PreparedStatement, Connection }
    import org.apache.spark.{ SparkContext, SparkConf }
    
    object WebPagePopularityValueCalculator1 {
      def main(args: Array[String]) {
        val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
    
        val topicpMap = topics.split(",").map((_, 2)).toMap
    
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val popularityData = lines.map { msgLine =>
          {
            val dataArr: Array[String] = msgLine.split(",")
            val pageID = dataArr(0)
            val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
            val itb = Iterator((pageID, dataArr(1).toDouble, dataArr(2).toDouble, dataArr(3).toDouble))
            toMySql(itb)
            (pageID, popValue)
          }
        }
        popularityData.print()
        ssc.start()
        ssc.awaitTermination()
    
      }
    
      def toMySql(iterator: Iterator[(String, Double, Double, Double)]): Unit = {
        var conn: Connection = null
        var ps: PreparedStatement = null
        val sql = "insert into userbehaviordatasource(page, v1, v2, v3) values (?, ?, ?, ?)"
        try {
          Class.forName("com.mysql.jdbc.Driver");
          conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123")
          iterator.foreach(dataIn => {
            ps = conn.prepareStatement(sql)
            ps.setString(1, dataIn._1)
            ps.setDouble(2, dataIn._2)
            ps.setDouble(3, dataIn._3)
            ps.setDouble(4, dataIn._4)
            ps.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (ps != null) {
            ps.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
    
    }
    
  • 相关阅读:
    docker 存储扩容和存放路径修改
    gitlab+jenkins+webhook 代码发布
    jenkins 流水线(pipline)
    kafka单机部署
    nload命令
    jumpserver部署维护
    mysql sleep连接过多的问题解决
    监控zabbix-server本身
    DevOps方案探究
    ceph 存储
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6544033.html
Copyright © 2011-2022 走看看