zoukankan      html  css  js  c++  java
  • SparkStreaming

    1.workcount

    package dayo7
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object NewworkWordCount {
    Logger.getLogger("org").setLevel(Level.WARN)
    
      def main(args: Array[String]): Unit = {
        //new Conf
        val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )
    
        //创建ssc  第二个参数是时间间隔
        val ssc = new StreamingContext ( conf, Seconds ( 2 ) )
    
        //获取数据
        val result = ssc.socketTextStream ( "192.168.186.150", 1234 )
    
        //处理数据,输出打印
        val result2 = result.flatMap ( _.split ( " " ) ).map ( (_, 1) ).reduceByKey ( _ + _ ).print ()
    
        //开启sparkStreaming
        ssc.start ()
    
        //创建阻塞线程
        ssc.awaitTermination ()
      }
      }

    2.将数据写到redis

    开启redis bin/redis-server  etc/redis.conf  查看端口 ps -ef|grep redis

    package dayo7
    
    import day08.Jpoods
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object WordCountToRedis1 {
    Logger.getLogger("org").setLevel(Level.WARN)
    
      def main(args: Array[String]): Unit = {
        //new xonf
        val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    
        //创建SparkStreaming
        val ssc=new StreamingContext(conf,Seconds(2))
    
        //读取数据
        val result: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.186.150",1234)
    
    
        result.foreachRDD(rdd=>{
          //处理数据
          val result2=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    
       //写入到redis
          result2.foreachPartition(filter=>{
            val jedis=Jpoods.getJedis()
    
            //创建表
            filter.foreach(tp=>{
              jedis.hincrBy("zi",tp._1,tp._2)
            })
    
            //关闭jedis
            jedis.close()
    
          })
    
        })
    
        //开启SparkStraming
        ssc.start()
    
        //创建阻塞线程
        ssc.awaitTermination()
      }
    }

     3.完整版SparkStreaming

    package sortby
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object WordCountSortBy2 {
      Logger.getLogger ( "org" ).setLevel ( Level.WARN )
    
      def main(args: Array[String]): Unit = {
        //StreamingContect去加载
        val ssc = StreamingContext.getOrCreate ( "./test1", creatingFunc )
        ssc.start ()
        ssc.awaitTermination ()
      }
    
    
      def creatingFunc(): StreamingContext = {
        val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )
        //多长时间处理一次呀
        val ssc = new StreamingContext ( conf, Seconds ( 3 ) )
    
        //缓存数据
        ssc.checkpoint ( "./test1" )
    
        //接收数据
        val lines = ssc.socketTextStream ( "192.168.186.150", 1234 )
    
        //更改时间  数据多久保存一次
        lines.checkpoint ( Seconds ( 5 ) )
    
        //处理数据
        lines.flatMap ( _.split ( " " ) ).map ( (_, 1) ).updateStateByKey ( updateFunc ).print ()
    
        ssc
      }
    
    //Seq[Int] 每个key新增值的集合   Option[Int]当前的保存状态
      def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {
        Some ( seq.sum + option.getOrElse ( 0 ) )
      }
    }
  • 相关阅读:
    进程和阻塞
    docker简介
    python===lambda匿名函数===day15
    python----生成器, 生成器函数, 推倒式---13
    python----函数参数---10天
    python---函数 第九天
    python===文件===第八天
    python===基本数据类型 基本增删改查 ===深浅拷贝==第七天
    20180802 (个别内置方法)
    20180730 (面向对象的反射,内置方法)
  • 原文地址:https://www.cnblogs.com/wangshuang123/p/11113347.html
Copyright © 2011-2022 走看看