zoukankan      html  css  js  c++  java
  • spark streaming的应用

      今天我们讲spark streaming的应用,这个是实时处理的,类似于Storm以及Flink相关的知识点,

      说来也巧,今天的自己也去听了关于Flink的相关的讲座,可惜自己没有听得特别清楚,好像是

      spark streaming与flink是竞争关系,好了,我们进入今天的主题吧

        1.一般会做用户画像的差不多集中在两个行业,电商以及广告行业

         一般根据现实给这个人打上一个标签,在根据标签来确定画像

        2.如果一个人不登录,怎样确定这个人的详情

        

        这个就是spark streaming的应用

        nc -lk 8888 这个端口可以一直发送数据

        请记住,spark中产生的rdd,可能会由于某种意外的原因,从而这个计算可能就要重新开始计算,

        但是假如我们设置了checkpoint(如果多个进程同时开始的话,我们可以搞一个共享存储)的时候,

        就可以保存这个值,当再一次出现意外的时候,就可以从恢复的这个值重新读取

          对于map来说,可以map(),同时也可以map{},这样的两种表达形式,不过当我们写成了case()的

          这种形式,则我们必须使用map的大括号的这种形式了,后文附带代码

        

    package cn.wj.spark.day09
    
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
    
    /**
      * Created by WJ on 2017/1/18.
      */
    object StateFulWordCount {
    
      //Seq这个批次某个单词的次数
       //Option[Int]:以前的结果
      //(hello,1),(hello,1),(tom,1)
      //(hello,Seq(1,1)),(tom,Seq(1))
      //此时x=>String(Key的值),y=>Seq[Int](当前的这个value的值),z=>Option[Int],这个代表的是以前的value的值
      val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])]) =>{
         iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(m =>(x,m))}
       }
    
      def main(args: Array[String]): Unit = {
        LoggerLevels.setStreamingLogLevels()
        //StreamingContext
        val conf = new SparkConf().setAppName("StreamingContext").setMaster("local[2]")
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("/tmp/ck")
       // sc.setCheckpointDir("hdfs://192.168.109.136:9000/person/myfile")
        val ssc = new StreamingContext(sc,Seconds(5))
        val ds = ssc.socketTextStream("192.168.109.136",8888)
        //updateStateByKey:这个方法的意思是说将每一次的partition都进行一次累计
        val result = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(sc.defaultParallelism),true)
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

       其中,LoggerLevels.setStreamingLogLevels()这个是设置日志文件的显示情况的,是让打出来的日志更清晰,

       如果没必要,可以删除的。

       首先我们在linux里面向8888端口发送信息:

       

       然后启动项目,这个时候就可以看见这个效果了(可以叠加的spark streaming)

       

  • 相关阅读:
    【福利】JetBrains 全家桶永久免费使用
    openlayers操作分享:如何从容的在vue中食用openlayers6
    Axios的正确食用方法
    uniapp 地图全解析+事件监听
    JS数据扁平化
    Codeforces 记录
    WP开发加载图片及文字Resource/Content说明
    根据属性获取属性名
    string和byte 数组转换的快速方法(采用unsafe)
    Code First Migrations更新数据库结构(数据迁移)
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6308888.html
Copyright © 2011-2022 走看看