zoukankan      html  css  js  c++  java
  • SparkStreaming 之 (一)worldCount demo

    一、pom

      <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.11</artifactId>
          <version>2.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.0.0</version>
        </dependency>
        <!-- spark -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.3.4</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.4</version>
        </dependency>
        <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.6.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>2.3.4</version>
        </dependency>
        <!-- log4j 1.2.17 -->
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.17</version>
        </dependency>
      </dependencies>

    二、回忆hive如何worldcount?

    链接:https://www.cnblogs.com/blogyuhan/p/9269057.html

    关键:使用explode把文本切成数组,再从数组中count(1)并group by!

    三、使用sparkStreaming统计文本

    1-1 使用reduceByKey

    1)下载netcat,并打开端口:

    yum install nmap-ncat.x86_64 
    nc -l 1234 # 暴露1234端口给你监控

    2)

    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object MySpark {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("app")
        val sc = SparkContext.getOrCreate(conf)
        val ssc = new StreamingContext(sc, Seconds(5))
        // 窗口,5秒一个
        val ds = ssc.socketTextStream("192.168.56.111", 1234)
        val res = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        res.print()
        // 启动容器
        ssc.start()
        ssc.awaitTermination() // 永远别停
      }
    }

    结果:

    2-1 使用 "根据key更新状态",统计类型为Int

    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
    
    object MySpark {
      val updateFunc =(it:Iterator[(String,Seq[Int],Option[Int])])=>{
        it.map(x=>{
          (x._1,x._2.sum+x._3.getOrElse(0))
        })
      }
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("app")
        val sc = SparkContext.getOrCreate(conf)
        val ssc = new StreamingContext(sc, Seconds(5))
        // 窗口,5秒一个
        val ds = ssc.socketTextStream("192.168.56.111", 1234)
      //  val res = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        //当使用updateStateByKey这个算子必须设置setCheckpointDir
        sc.setCheckpointDir("D:/cks")
        //计算wordcount 累计
        val res = ds.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(sc.defaultParallelism), true)
        res.print()
        // 启动容器
        ssc.start()
        ssc.awaitTermination() // 永远别停
      }
    }

    结果:

    (www,9)

    (aaa,11) 会进行累加

    2-2 统计类型为自定义样例类 map((_,Userinfos(1,"zc",2)))

    (_,Userinfos(1,"zc",2))变成(_,Int)

    case class Userinfos(userid:Int,username:String,age:Int)
    object MySpark {
      val updateFunc: Iterator[(String, Seq[Userinfos], Option[Int])] => Iterator[(String, Int)] = (it:Iterator[(String,Seq[Userinfos],Option[Int])])=>{
        it.map(x=>{
          (x._1,x._2.map(u=>u.age).sum+x._3.getOrElse(0))
        })
      }

    (_,Userinfos(1,"zc",2)) => (_,Userinfos(1,"zc",sum)) 

    case class Userinfos(userid:Int,username:String,age:Int)
    object MySpark {
      val obj = Userinfos(0,"",0)
      val updateFunc  = (it:Iterator[(String,Seq[Userinfos],Option[Userinfos])])=>{
        it.map(x=>{
          (x._1,Userinfos(1,"xc",x._2.map(u=>u.age).sum+x._3.getOrElse(obj).age))
        })
      }

    2-3  统计类型为(Int,String,Int)

    // 把seq元组变成seq(int).sum+...

    (x._1,x._2.map(u=>u._3).sum+x._3.getOrElse((1,"",0))._3)

    模式匹配 case (x,y是Seq,z是Option)

    (x,(y.map(u=>u.3).sum)+z.getOrElse((1,"",0)))._3))
  • 相关阅读:
    《linux 必读》
    ldd ldconfig
    rpm 数据库
    /bin, /sbin & /usr/bin, /usr/sbin & /usr/local/bin, /usr/local/sbin & glibc
    POSIX
    CentOS 下载地址
    insert into TABLE by SELECT ...
    httpd 处理模型
    http 状态码
    IP地址 0.0.0.0 是什么意思?
  • 原文地址:https://www.cnblogs.com/sabertobih/p/14121019.html
Copyright © 2011-2022 走看看