zoukankan      html  css  js  c++  java
  • SparkRDD编程实战

    通过spark实现点击流日志分析案例

    1. 访问的pv

    package cn.itcast
    
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}
    
      object PV {
      def main(args: Array[String]): Unit = {
            //todo:创建sparkconf,设置appName
            //todo:setMaster("local[2]")在本地模拟spark运行 这里的数字表示 使用2个线程
            val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
    
            //todo:创建SparkContext
            val sc: SparkContext = new SparkContext(sparkConf)
    
            //todo:读取数据
            val file: RDD[String] = sc.textFile("d:\data\access.log")
    
            //todo:将一行数据作为输入,输出("pv",1)
            val pvAndOne: RDD[(String, Int)] = file.map(x=>("pv",1))
    
            //todo:聚合输出
             val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_+_)
             totalPV.foreach(println)
    
             sc.stop()
      }
    }


    2. 访问的uv
     

    package cn.itcast
    
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}
    
      object UV {
      def main(args: Array[String]): Unit = {
        //todo:构建SparkConf和 SparkContext
        val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")
    
        val sc: SparkContext = new SparkContext(sparkConf)
    
        //todo:读取数据
        val file: RDD[String] = sc.textFile("d:\data\access.log")
    
        //todo:对每一行分隔,获取IP地址
        val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0))
    
        //todo:对ip地址进行去重,最后输出格式 ("UV",1)
        val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1))
    
        //todo:聚合输出
        val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_)
        totalUV.foreach(println)
    
        //todo:数据结果保存
        totalUV.saveAsTextFile("d:\data\out")
    
        sc.stop()
      }
    }


    3. 访问的topN
     

    package cn.itcast
    
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}
    
      /**
      * 求访问的topN
      */
      object TopN {
      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")
    
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")
    
        //读取数据
        val file: RDD[String] = sc.textFile("d:\data\access.log")
    
        //将一行数据作为输入,输出(来源URL,1)
        val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))
    
        //聚合 排序-->降序
        val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)
    
        //通过take取topN,这里是取前5名
        val finalResult: Array[(String, Int)] = result.take(5)
        println(finalResult.toBuffer)
    
        sc.stop()
      }
    }


    通过Spark实现ip地址查询
     

    1. 需求分析

           在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。

     

           因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。

    2. 技术调研

           因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计算spark来实现上述功能。

    3. 架构设计

    搭建spark集群

    4. 开发流程

    4.1. 数据准备

    4.2. ip日志信息

    在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍

     

    4.3. 城市ip段信息

     

    5. 代码开发

    5.1. 思路

    1、  加载城市ip段信息,获取ip起始数字和结束数字,经度,维度

    2、  加载日志数据,获取ip信息,然后转换为数字,和ip段比较

    3、  比较的时候采用二分法查找,找到对应的经度和维度

    4、  然后对经度和维度做单词计数

    5.2. 代码

    package cn.itcast.IPlocaltion
    
      import java.sql.{Connection, DriverManager, PreparedStatement}
      import org.apache.spark.broadcast.Broadcast
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}
    
      /**
      * ip地址查询
      */
      object IPLocaltion_Test {
      def main(args: Array[String]): Unit = {
          //todo:创建sparkconf 设置参数
          val sparkConf: SparkConf = new SparkConf().setAppName("IPLocaltion_Test").setMaster("local[2]")
    
          //todo:创建SparkContext
          val sc = new SparkContext(sparkConf) 
    
          //todo:读取基站数据
          val data: RDD[String] = sc.textFile("d:\data\ip.txt")
    
          //todo:对基站数据进行切分 ,获取需要的字段 (ipStart,ipEnd,城市位置,经度,纬度)
          val jizhanRDD: RDD[(String, String, String, String, String)] = data.map(_.split("\|")).map(
    
            x => (x(2), x(3), x(4) + "-" + x(5) + "-" + x(6) + "-" + x(7) + "-" + x(8), x(13), x(14)))
    
          //todo:获取RDD的数据
          val jizhanData: Array[(String, String, String, String, String)] = jizhanRDD.collect()
    
          //todo:广播变量,一个只读的数据区,所有的task都能读到的地方
          val jizhanBroadcast: Broadcast[Array[(String, String, String, String, String)]] = sc.broadcast(jizhanData) 
    
          //todo:读取目标数据
          val destData: RDD[String] = sc.textFile("d:\data\20090121000132.394251.http.format")
    
          //todo:获取数据中的ip地址字段
          val ipData: RDD[String] = destData.map(_.split("\|")).map(x=>x(1))
    
         //todo:把IP地址转化为long类型,然后通过二分法去基站数据中查找,找到的维度做wordCount
         val result=ipData.mapPartitions(iter=>{
    
          //获取广播变量中的值
          val valueArr: Array[(String, String, String, String, String)] = jizhanBroadcast.value
    
          //todo:操作分区中的itertator
          iter.map(ip=>{
            //将ip转化为数字long
            val ipNum:Long=ipToLong(ip)
    
            //拿这个数字long去基站数据中通过二分法查找,返回ip在valueArr中的下标
            val index:Int=binarySearch(ipNum,valueArr)
    
            //根据下标获取对一个的经纬度
            val tuple = valueArr(index)
    
            //返回结果 ((经度,维度),1)
            ((tuple._4,tuple._5),1)
          })
        })
    
        //todo:分组聚合
        val resultFinal: RDD[((String, String), Int)] = result.reduceByKey(_+_)
    
        //todo:打印输出
        resultFinal.foreach(println)
    
        //todo:将结果保存到mysql表中
      resultFinal.map(x=>(x._1._1,x._1._2,x._2)).foreachPartition(data2Mysql)
    sc.stop()
      }
    
      //todo:ip转为long类型
      def ipToLong(ip: String): Long = {
        //todo:切分ip地址。
        val ipArray: Array[String] = ip.split("\.")
        var ipNum=0L
    
        for(i <- ipArray){
          ipNum=i.toLong | ipNum << 8L
        }
        ipNum
      }
    
      //todo:通过二分查找法,获取ip在广播变量中的下标
      def binarySearch(ipNum: Long, valueArr: Array[(String, String, String, String, String)]): Int ={
        //todo:口诀:上下循环寻上下,左移右移寻中间
        //开始下标
        var start=0
    
        //结束下标
        var end=valueArr.length-1
    
        while(start<=end){
          val middle=(start+end)/2
    
          if(ipNum>=valueArr(middle)._1.toLong && ipNum<=valueArr(middle)._2.toLong){
            return middle
          }
          if(ipNum > valueArr(middle)._2.toLong){
            start=middle
          }
    
          if(ipNum<valueArr(middle)._1.toLong){
            end=middle
          }
        }
        -1
      }
    
      //todo:数据保存到mysql表中
      def data2Mysql(iterator:Iterator[(String,String, Int)]):Unit = {
        //todo:创建数据库连接Connection
        var conn:Connection=null
    
        //todo:创建PreparedStatement对象
        var ps:PreparedStatement=null
    
        //todo:采用拼占位符问号的方式写sql语句。
        var sql="insert into iplocaltion(longitude,latitude,total_count) values(?,?,?)"
    
        //todo:获取数据连接    conn=DriverManager.getConnection("jdbc:mysql://itcast01:3306/spark","root","root123")
    
        //todo:  选中想被try/catch包围的语句 ctrl+alt+t 快捷键选中try/catch/finally
        try {
            iterator.foreach(line=> {
            //todo:预编译sql语句
            ps = conn.prepareStatement(sql)
    
            //todo:对占位符设置值,占位符顺序从1开始,第一个参数是占位符的位置,第二个参数是占位符的值。
            ps.setString(1, line._1)
            ps.setString(2, line._2)
            ps.setLong(3, line._3)
    
            //todo:执行
            ps.execute()
            })
          } catch {
            case e:Exception =>println(e)
          } finally {
            if(ps!=null){
              ps.close()
            }
    
            if (conn!=null){
              conn.close()
            }
          }
      }
    }

     

     

     

     

     

     

     

     

     

     


     

  • 相关阅读:
    3090显卡(CUDA11.1)安装Pytorch
    ros环境搭建
    github设置仓库可见性 私人仓库设置他人协作/可见
    安全可靠国产系统下的应用怎么搭建?
    燕山大学操作系统课程设计计划书
    flink 1.9.0 编译:flink-fs-hadoop-shaded 找不到
    产品经理面试——简历填写
    什么是可串行化MVCC
    fatal: early EOF fatal: index-pack failed
    Maven 初学+http://mvnrepository.com/
  • 原文地址:https://www.cnblogs.com/jifengblog/p/9369271.html
Copyright © 2011-2022 走看看