转自:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/
生产者:
package sparkStreaming import scala.util.Random import java.util.HashMap import org.apache.kafka.clients.producer._ import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object UserBehaviorProducerClient { private val PAGE_NUM = 100 private val MAX_MSG_NUM = 3 private val MAX_CLICK_TIME = 5 private val MAX_STAY_TIME = 10 private val LIKE_OR_NOT = Array[Int](1, 0, -1) def main(args: Array[String]) { val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test_topic", "3") val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) val rand = new Random() while (true) { //how many user behavior messages will be produced val msgNum = rand.nextInt(MAX_MSG_NUM) + 1 try { //generate the message with format like page1|2|7.123|1 for (i <- 0 to msgNum) { var msg = new StringBuilder() msg.append("page" + (rand.nextInt(PAGE_NUM) + 1)) msg.append("|") msg.append(rand.nextInt(MAX_CLICK_TIME) + 1) msg.append("|") msg.append(rand.nextInt(MAX_CLICK_TIME) + rand.nextFloat()) msg.append("|") msg.append(LIKE_OR_NOT(rand.nextInt(3))) println(msg.toString()) val message = new ProducerRecord[String, String](topic, null, msg.toString()) producer.send(message) } println("%d user behavior messages produced.".format(msgNum+1)) } catch { case e: Exception => println(e) } try { //sleep for 5 seconds after send a micro batch of message Thread.sleep(5000) } catch { case e: Exception => println(e) } } } }
消费者:
package sparkStreaming import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.HashPartitioner import org.apache.spark.streaming.Duration object WebPagePopularityValueCalculator { def main(args: Array[String]) { val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic") val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_, 2)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val popularityData = lines.map { msgLine => { val dataArr: Array[String] = msgLine.split("\|") val pageID = dataArr(0) val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 (pageID, popValue) } } //sum the previous popularity value and current value val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(t => { val newValue: Double = t._2.sum val stateValue: Double = t._3.getOrElse(0); Some(newValue + stateValue) }.map(sumedValue => (t._1, sumedValue))) } val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00))) val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDstream.checkpoint(Duration(8 * 2 * 1000)) //after calculation, we need to sort the result and only show the top 10 hot pages stateDstream.foreachRDD { rdd => { val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false) val topKData = sortedData.take(10).map { case (v, k) => (k, v) } topKData.foreach(x => { println(x) }) } } ssc.start() ssc.awaitTermination() } }