转自:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/
生产者:
package sparkStreaming
import scala.util.Random
import java.util.HashMap
import org.apache.kafka.clients.producer._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object UserBehaviorProducerClient {
private val PAGE_NUM = 100
private val MAX_MSG_NUM = 3
private val MAX_CLICK_TIME = 5
private val MAX_STAY_TIME = 10
private val LIKE_OR_NOT = Array[Int](1, 0, -1)
def main(args: Array[String]) {
val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test_topic", "3")
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val rand = new Random()
while (true) {
//how many user behavior messages will be produced
val msgNum = rand.nextInt(MAX_MSG_NUM) + 1
try {
//generate the message with format like page1|2|7.123|1
for (i <- 0 to msgNum) {
var msg = new StringBuilder()
msg.append("page" + (rand.nextInt(PAGE_NUM) + 1))
msg.append("|")
msg.append(rand.nextInt(MAX_CLICK_TIME) + 1)
msg.append("|")
msg.append(rand.nextInt(MAX_CLICK_TIME) + rand.nextFloat())
msg.append("|")
msg.append(LIKE_OR_NOT(rand.nextInt(3)))
println(msg.toString())
val message = new ProducerRecord[String, String](topic, null, msg.toString())
producer.send(message)
}
println("%d user behavior messages produced.".format(msgNum+1))
} catch {
case e: Exception => println(e)
}
try {
//sleep for 5 seconds after send a micro batch of message
Thread.sleep(5000)
} catch {
case e: Exception => println(e)
}
}
}
}
消费者:
package sparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.Duration
object WebPagePopularityValueCalculator {
def main(args: Array[String]) {
val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_, 2)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val popularityData = lines.map { msgLine =>
{
val dataArr: Array[String] = msgLine.split("\|")
val pageID = dataArr(0)
val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
(pageID, popValue)
}
}
//sum the previous popularity value and current value
val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
iterator.flatMap(t => {
val newValue: Double = t._2.sum
val stateValue: Double = t._3.getOrElse(0);
Some(newValue + stateValue)
}.map(sumedValue => (t._1, sumedValue)))
}
val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
//set the checkpoint interval to avoid too frequently data checkpoint which may
//may significantly reduce operation throughput
stateDstream.checkpoint(Duration(8 * 2 * 1000))
//after calculation, we need to sort the result and only show the top 10 hot pages
stateDstream.foreachRDD { rdd =>
{
val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)
val topKData = sortedData.take(10).map { case (v, k) => (k, v) }
topKData.foreach(x => {
println(x)
})
}
}
ssc.start()
ssc.awaitTermination()
}
}