/** * Created by root on 9/8/15. */ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.sql.SQLContext object SparkStreamingTest { def main(args: Array[String]) { //create a local StreamingContext with two working thread and batch interval of 1 second. val conf = new SparkConf().setAppName("Spark streaming test").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(10)) //val sc = ssc.sparkContext val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) //word count val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() //convert spark streaming to sparksql words.foreachRDD((rdd: RDD[String], time: Time) => { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val wordsDataFrame = rdd.map(w => Record(w)).toDF() wordsDataFrame.registerTempTable("words") val wordsCountDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") println(s"======================= $time =======================") wordsCountDataFrame.show() }) ssc.start() ssc.awaitTermination() } } case class Record(word: String) object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }