zoukankan      html  css  js  c++  java
  • 大数据学习——spark-steaming学习

    官网http://spark.apache.org/docs/latest/streaming-programming-guide.html

    1.1.  用Spark Streaming实现实时WordCount

    1.安装并启动生成者

    首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具

    yum install -y nc

    启动一个服务端并监听9999端口

    nc -lk 9999

     

    2.编写Spark Streaming程序

    package org.apache.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    
    object TCPWordCount {
      def main(args: Array[String]) {
        //setMaster("local[2]")本地执行2个线程,一个用来接收消息,一个用来计算
        val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount")
        //创建spark的streaming,传入间隔多长时间处理一次,间隔在5秒左右,否则打印控制台信息会被冲掉
        val scc = new StreamingContext(conf, Seconds(5))
        //读取数据的地址:从某个ip和端口收集数据
        val lines = scc.socketTextStream("192.168.74.100", 9999) //进行rdd处理 val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //将结果打印控制台  results.print() //启动spark streaming  scc.start() //等待终止  scc.awaitTermination() } }

    3.启动Spark Streaming程序:由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序

    注意:要指定并行度,如在本地运行设置setMaster("local[2]"),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1

    4.在Linux端命令行中输入单词

    5.在IDEA控制台中查看结果

    问题:结果每次在Linux段输入的单词次数都被正确的统计出来,但是结果不能累加!如果需要累加需要使用updateStateByKey(func)来更新状态,下面给出一个例子:

    package org.apache.spark
    
    import org.apache.spark.HashPartitioner
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    
    object TCPWordCountUpdate {
      /**
        * String:某个单词
        * Seq:[1,1,1,1,1,1],当前批次出现的次数的序列
        * Option:历史的结果的sum
        */
    
      val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
        iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
        //iter.map{case(x,y,z)=>(x,y.sum+z.getOrElse(0))}
      }
    
      def updateFunction2(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        Some(newValues.sum + runningCount.getOrElse(0))
      }
    
    
      def main(args: Array[String]) {
        //setMaster("local[2]")本地执行2个线程,一个用来接收消息,一个用来计算
        val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount")
        //创建spark的streaming,传入间隔多长时间处理一次,间隔在5秒左右,否则打印控制台信息会被冲掉
        val scc = new StreamingContext(conf, Seconds(5))
        scc.checkpoint("./")//读取数据的地址:从某个ip和端口收集数据
        val lines = scc.socketTextStream("192.168.74.100", 9999)
        //进行rdd处理
        /**
          * updateStateByKey()更新数据
          * 1、更新数据的具体实现函数
          * 2、分区信息
          * 3、boolean值
          */
        //val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction2 _)
        val results = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction, new HashPartitioner(scc.sparkContext.defaultParallelism), true)
        //将结果打印控制台
        results.print()
        //启动spark streaming
        scc.start()
        //等待终止
        scc.awaitTermination()
      }
    }

    1.1.  使用reduceByKeyAndWindow计算每分钟数据

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object SparkSqlTest {
      def main(args: Array[String]) {
        LoggerLevels.setStreamingLogLevels()
        val conf = new SparkConf().setAppName("sparksql").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Seconds(5))
        ssc.checkpoint("./")
        val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.74.100",9999)
        val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(5),Seconds(5))
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    1.1.  Spark Streaming整合Kafka完成网站点击流实时统计

    1.安装并配置zk

    2.安装并配置Kafka

    3.启动zk

    4.启动Kafka

    5.创建topic

    bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181

    --replication-factor 3 --partitions 3 --topic urlcount

    6.编写Spark Streaming应用程序

    package cn.itcast.spark.streaming
    
    package cn.itcast.spark
    
    import org.apache.spark.{HashPartitioner, SparkConf}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object UrlCount {
      val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
        iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
      }
    
      def main(args: Array[String]) {
        //接收命令行中的参数
       // val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
    val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("master1ha:2181,master2:2181,master2ha:2181","g1","wangsf-test","2")
        //创建SparkConf并设置AppName
        val conf = new SparkConf().setAppName("UrlCount")
        //创建StreamingContext
        val ssc = new StreamingContext(conf, Seconds(2))
        //设置检查点
        ssc.checkpoint(hdfs)
        //设置topic信息
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        //重Kafka中拉取数据创建DStream
        val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
        //切分数据,截取用户点击的url
        val urls = lines.map(x=>(x.split(" ")(6), 1))
        //统计URL点击量
        val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
        //将结果打印到控制台
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    生产数据测试:

    kafka-console-producer.sh --broker-list h2slave1:9092 --topic wangsf-test

  • 相关阅读:
    促仅开发者间交流与合作的胡思乱想
    Unity3D音乐音效研究-MIDI与波表
    剑英陪你玩转图形学 (二)彩虹
    java mail实现Email的发送,完整代码
    linux 下 安装 rpm 格式 的 mysql
    ExtJs3带条件的分页查询的实现
    查看linux的版本信息
    java 非法字符过滤 , 半角/全角替换
    MySQL数据库错误server_errno=2013的解决
    MySQL数据库双机热备份
  • 原文地址:https://www.cnblogs.com/feifeicui/p/11017411.html
Copyright © 2011-2022 走看看