import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext} object DStream_Kafa_Consumer { def main(args: Array[String]): Unit = { val sc=new SparkConf().setAppName("kafaCountWord").setMaster("local[2]") val ssc=new StreamingContext(sc,Seconds(10)) ssc.checkpoint("file:///usr/local2/spark/mycode/kafa/checkpoint") val zookeeperServer="localhost:2181"//zookeeper服务器地址(默认的) val group="soyo_kafa_group" val topics="wordsender" val num=1 //每个topic的分区数 val topicMap=topics.split(",").map((_,num.toInt)).toMap val lines=KafkaUtils.createStream(ssc,zookeeperServer,group,topicMap).map(_._2) val words=lines.flatMap(_.split(" ")) val wordCounts=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Minutes(2),Seconds(10),2) // val wordCounts_2=words.map(x=>(x,1)).reduceByKey(_+_) wordCounts.print(200) // wordCounts_2.print(200) ssc.start() ssc.awaitTermination } }
结果:需要Kafka-Producer程序输入数据
-------------------------------------------
Time: 1508230980000 ms
-------------------------------------------
(4,61)
(8,69)
(6,66)
(0,70)
(2,61)
(7,69)
(5,61)
(9,74)
(3,62)
(1,72)
-------------------------------------------
Time: 1508230990000 ms
-------------------------------------------
(4,61)
(8,69)
(6,66)
(0,70)
(2,61)
(7,69)
(5,61)
(9,74)
(3,62)
(1,72)
-------------------------------------------