官网http://spark.apache.org/docs/latest/streaming-programming-guide.html
1.1. 用Spark Streaming实现实时WordCount
1.安装并启动生成者
首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具
yum install -y nc
启动一个服务端并监听9999端口
nc -lk 9999
2.编写Spark Streaming程序
package org.apache.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object TCPWordCount { def main(args: Array[String]) { //setMaster("local[2]")本地执行2个线程,一个用来接收消息,一个用来计算 val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount") //创建spark的streaming,传入间隔多长时间处理一次,间隔在5秒左右,否则打印控制台信息会被冲掉 val scc = new StreamingContext(conf, Seconds(5)) //读取数据的地址:从某个ip和端口收集数据 val lines = scc.socketTextStream("192.168.74.100", 9999) //进行rdd处理 val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //将结果打印控制台 results.print() //启动spark streaming scc.start() //等待终止 scc.awaitTermination() } }
3.启动Spark Streaming程序:由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序
注意:要指定并行度,如在本地运行设置setMaster("local[2]"),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1
4.在Linux端命令行中输入单词
5.在IDEA控制台中查看结果
问题:结果每次在Linux段输入的单词次数都被正确的统计出来,但是结果不能累加!如果需要累加需要使用updateStateByKey(func)来更新状态,下面给出一个例子:
package org.apache.spark import org.apache.spark.HashPartitioner import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext object TCPWordCountUpdate { /** * String:某个单词 * Seq:[1,1,1,1,1,1],当前批次出现的次数的序列 * Option:历史的结果的sum */ val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0))) //iter.map{case(x,y,z)=>(x,y.sum+z.getOrElse(0))} } def updateFunction2(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { Some(newValues.sum + runningCount.getOrElse(0)) } def main(args: Array[String]) { //setMaster("local[2]")本地执行2个线程,一个用来接收消息,一个用来计算 val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount") //创建spark的streaming,传入间隔多长时间处理一次,间隔在5秒左右,否则打印控制台信息会被冲掉 val scc = new StreamingContext(conf, Seconds(5)) scc.checkpoint("./")//读取数据的地址:从某个ip和端口收集数据 val lines = scc.socketTextStream("192.168.74.100", 9999) //进行rdd处理 /** * updateStateByKey()更新数据 * 1、更新数据的具体实现函数 * 2、分区信息 * 3、boolean值 */ //val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction2 _) val results = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction, new HashPartitioner(scc.sparkContext.defaultParallelism), true) //将结果打印控制台 results.print() //启动spark streaming scc.start() //等待终止 scc.awaitTermination() } }
1.1. 使用reduceByKeyAndWindow计算每分钟数据
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkSqlTest { def main(args: Array[String]) { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("sparksql").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("./") val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.74.100",9999) val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(5),Seconds(5)) result.print() ssc.start() ssc.awaitTermination() } }
1.1. Spark Streaming整合Kafka完成网站点击流实时统计
1.安装并配置zk
2.安装并配置Kafka
3.启动zk
4.启动Kafka
5.创建topic
bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181
--replication-factor 3 --partitions 3 --topic urlcount
6.编写Spark Streaming应用程序
package cn.itcast.spark.streaming package cn.itcast.spark import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object UrlCount { val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))} } def main(args: Array[String]) { //接收命令行中的参数 // val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("master1ha:2181,master2:2181,master2ha:2181","g1","wangsf-test","2") //创建SparkConf并设置AppName val conf = new SparkConf().setAppName("UrlCount") //创建StreamingContext val ssc = new StreamingContext(conf, Seconds(2)) //设置检查点 ssc.checkpoint(hdfs) //设置topic信息 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //重Kafka中拉取数据创建DStream val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2) //切分数据,截取用户点击的url val urls = lines.map(x=>(x.split(" ")(6), 1)) //统计URL点击量 val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) //将结果打印到控制台 result.print() ssc.start() ssc.awaitTermination() } }
生产数据测试:
kafka-console-producer.sh --broker-list h2slave1:9092 --topic wangsf-test