生产者
import java.util.HashMap import org.apache.kafka.clients.producer._ import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object spark_kafka_wordcount_producer { def main(args: Array[String]) { val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3") val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) while(true) { val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) Thread.sleep(1000) } } }
消费者
import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object spark_kafka_wordcount_customer { def main(args: Array[String]) { val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first") val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,2)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }