package sparkStreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.HashPartitioner import org.apache.spark.streaming.Duration import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import spark.bean.orders import java.util.Properties import java.sql.{DriverManager, PreparedStatement, Connection} import org.apache.spark.{SparkContext, SparkConf} object WebPagePopularityValueCalculator { def main(args: Array[String]) { val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic") val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_, 2)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val popularityData = lines.map { msgLine => { val dataArr: Array[String] = msgLine.split("\|") val pageID = dataArr(0) val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 (pageID, popValue) } } //sum the previous popularity value and current value val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(t => { val newValue: Double = t._2.sum val stateValue: Double = t._3.getOrElse(0); Some(newValue + stateValue) }.map(sumedValue => (t._1, sumedValue))) } val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00))) val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDstream.checkpoint(Duration(8 * 2 * 1000)) //after calculation, we need to sort the result and only show the top 10 hot pages stateDstream.foreachRDD { rdd => { val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false) val topKData = sortedData.take(10).map { case (v, k) => (k, v) } topKData.foreach{ case (k, v) => if(v != 0) { println("page" + k + " " + "value" + v) val itb = Iterator((k, v)) toMySql(itb) } } } } ssc.start() ssc.awaitTermination() } def toMySql(iterator: Iterator[(String, Double)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into userbehavior(page, number) values (?, ?)" try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123") iterator.foreach(dataIn => { ps = conn.prepareStatement(sql) ps.setString(1, dataIn._1) ps.setDouble(2, dataIn._2) ps.executeUpdate() } ) } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } } } }
重复存储问题修复
package sparkStreaming import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf import java.sql.{ DriverManager, PreparedStatement, Connection } import org.apache.spark.{ SparkContext, SparkConf } object WebPagePopularityValueCalculator1 { def main(args: Array[String]) { val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic") val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_, 2)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val popularityData = lines.map { msgLine => { val dataArr: Array[String] = msgLine.split("\|") val pageID = dataArr(0) val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 val itb = Iterator((pageID, popValue)) toMySql(itb) (pageID, popValue) } } popularityData.print() ssc.start() ssc.awaitTermination() } def toMySql(iterator: Iterator[(String, Double)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into userbehavior(page, number) values (?, ?)" try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123") iterator.foreach(dataIn => { ps = conn.prepareStatement(sql) ps.setString(1, dataIn._1) ps.setDouble(2, dataIn._2) ps.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } } } }
package sparkStreaming import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf import java.sql.{ DriverManager, PreparedStatement, Connection } import org.apache.spark.{ SparkContext, SparkConf } object WebPagePopularityValueCalculator1 { def main(args: Array[String]) { val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic") val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_, 2)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val popularityData = lines.map { msgLine => { val dataArr: Array[String] = msgLine.split(",") val pageID = dataArr(0) val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 val itb = Iterator((pageID, dataArr(1).toDouble, dataArr(2).toDouble, dataArr(3).toDouble)) toMySql(itb) (pageID, popValue) } } popularityData.print() ssc.start() ssc.awaitTermination() } def toMySql(iterator: Iterator[(String, Double, Double, Double)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into userbehaviordatasource(page, v1, v2, v3) values (?, ?, ?, ?)" try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123") iterator.foreach(dataIn => { ps = conn.prepareStatement(sql) ps.setString(1, dataIn._1) ps.setDouble(2, dataIn._2) ps.setDouble(3, dataIn._3) ps.setDouble(4, dataIn._4) ps.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } } } }