1.workcount
package dayo7 import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object NewworkWordCount { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { //new Conf val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" ) //创建ssc 第二个参数是时间间隔 val ssc = new StreamingContext ( conf, Seconds ( 2 ) ) //获取数据 val result = ssc.socketTextStream ( "192.168.186.150", 1234 ) //处理数据,输出打印 val result2 = result.flatMap ( _.split ( " " ) ).map ( (_, 1) ).reduceByKey ( _ + _ ).print () //开启sparkStreaming ssc.start () //创建阻塞线程 ssc.awaitTermination () } }
2.将数据写到redis
开启redis bin/redis-server etc/redis.conf 查看端口 ps -ef|grep redis
package dayo7 import day08.Jpoods import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCountToRedis1 { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { //new xonf val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") //创建SparkStreaming val ssc=new StreamingContext(conf,Seconds(2)) //读取数据 val result: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.186.150",1234) result.foreachRDD(rdd=>{ //处理数据 val result2=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //写入到redis result2.foreachPartition(filter=>{ val jedis=Jpoods.getJedis() //创建表 filter.foreach(tp=>{ jedis.hincrBy("zi",tp._1,tp._2) }) //关闭jedis jedis.close() }) }) //开启SparkStraming ssc.start() //创建阻塞线程 ssc.awaitTermination() } }
3.完整版SparkStreaming
package sortby import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCountSortBy2 { Logger.getLogger ( "org" ).setLevel ( Level.WARN ) def main(args: Array[String]): Unit = { //StreamingContect去加载 val ssc = StreamingContext.getOrCreate ( "./test1", creatingFunc ) ssc.start () ssc.awaitTermination () } def creatingFunc(): StreamingContext = { val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" ) //多长时间处理一次呀 val ssc = new StreamingContext ( conf, Seconds ( 3 ) ) //缓存数据 ssc.checkpoint ( "./test1" ) //接收数据 val lines = ssc.socketTextStream ( "192.168.186.150", 1234 ) //更改时间 数据多久保存一次 lines.checkpoint ( Seconds ( 5 ) ) //处理数据 lines.flatMap ( _.split ( " " ) ).map ( (_, 1) ).updateStateByKey ( updateFunc ).print () ssc } //Seq[Int] 每个key新增值的集合 Option[Int]当前的保存状态 def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = { Some ( seq.sum + option.getOrElse ( 0 ) ) } }