zoukankan      html  css  js  c++  java
  • 大数据学习之 Spark基本编程案例 48

    案例一:计算网页访问量前三名

    源数据大致预览:

    编写Scala代码:

    package day02
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @author dawn
      * @version 1.0, 2019年6月21日11:40:16
      *
      *          需求:计算网页访问量前三名
      *          用户:喜欢视频 直播
      *          帮助企业做经营和决策
      *
      *          看数据
      */
    object UrlCount {
      def main(args: Array[String]): Unit = {
        //1.加载数据
        val conf: SparkConf = new SparkConf().setAppName("UrlCount").setMaster("local[2]")
        //Spark程序入口
        val sc: SparkContext = new SparkContext(conf)
        //加载数据
        val rdd1: RDD[String] = sc.textFile("itstar.log")
    
        //2.对数据进行计算
        val rdd2:RDD[(String,Int)]=rdd1.map(line => {
          val s = line.split("	")
          //标注出现1次
          (s(1), 1)
        })
    
        //3.将相同的网址进行累加求和  网页,201
        val rdd3:RDD[(String,Int)] = rdd2.reduceByKey(_+_)
    
        //4.排序 取出前三
        val rdd4:Array[(String,Int)] = rdd3.sortBy(_._2,false).take(3)
    
        //5.遍历打印
        rdd3.foreach(x => {
          println("网址为:"+x._1+"访问量为:"+x._2)
        })
    
        //6.转换 toString toBuffer
        println(rdd4.toBuffer)
        sc.stop()
      }
    }
    

     

    运行结果:

    案例二:求出每个学院 访问第一位的网址,分组

    编写Scala代码:

    package day02
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @author dawn
      * @version 1.0, 2019年6月21日12:25:44
      *          需求:求出每个学院 访问第一位的网址,分组
      *          bigdata:video(直播)
      *          java:video
      *          python:teacher
      */
    object UrlGoupCount {
      def main(args: Array[String]): Unit = {
        //1.创建SparkConText
        val conf:SparkConf=new SparkConf().setAppName("UrlGoupCount").setMaster("local[2]")
        val sc:SparkContext=new SparkContext(conf)
    
        //2.加载数据
        val rdd1:RDD[String]=sc.textFile("itstar.log")
    
        //3.切分
        val rdd2:RDD[(String,Int)] = rdd1.map(line =>{
          val s=line.split("	")
          (s(1),1)
        })
    
        //4.求出总的访问量 网址,总的访问量
        val rdd3:RDD[(String,Int)] = rdd2.reduceByKey((x,y)  => x+y )
    
        //5.取出学院
        val rdd4:RDD[(String,String,Int)] = rdd3.map(x =>{
          //拿到url
          val url:String=x._1
          //用java的方式拿到主机名
          val host:String =new URL(url).getHost.split("[.]")(0)
          //元组输出
          (host,url,x._2)
        })
    
        //6.按照学院进行分组 groupBy返回的结果是:RDD[(k,iterator[(String,Stirng,Int)])]
        val rdd5:RDD[(String,List[(String,String,Int)])]=rdd4.groupBy(_._1).mapValues(it => {
          //倒序,记得要将iterator[(String,Stirng,Int)]转成List再排序
          it.toList.sortBy(_._3).reverse.take(1)
        })
    
        //7.遍历打印
        rdd5.foreach(x =>{
          println("学院为:"+x._1+"||"+"访问量第一的是:"+x._2)
        })
    
        sc.stop()
      }
    
    }
    

      

    运行结果:

    案例三:加入自定义分区 按照学院分区,相同的学院分为一个结果文件

    编写Scala代码:

    package day02
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    /**
      * @author Dawn
      * @version 1.0, 2019年6月21日12:25:49
      *          需求:加入自定义分区
      *          按照学院分区,相同的学院分为一个结果文件
      */
    object UrlParCount {
      def main(args: Array[String]): Unit = {
        //1.创建SparkContext对象
        val conf:SparkConf=new SparkConf().setAppName("").setMaster("local[2]")
        val sc:SparkContext=new SparkContext(conf)
    
        //2.加载数据
        val rdd1:RDD[(String,Int)] = sc.textFile("itstar.log").map(line => {
          val s=line.split("	")
          (s(1),1)
        })
    
        //3.聚合
        val rdd2:RDD[(String,Int)] = rdd1.reduceByKey(_+_)
    
        //4.自定义格式
        val rdd3:RDD[(String,(String,Int))]=rdd2.map(t =>{
          val url=t._1
          val host=new URL(url).getHost
          val XHost=host.split("[.]")(0)
          //元组输出
          (XHost,(url,t._2))
        })
    
        //5.加入自定义分区
        val xueyuan:Array[String] = rdd3.map(_._1).distinct().collect()//去重只剩下net bigdata java
        val xueYuanPartitioner:XueYuanParititioner=new XueYuanParititioner(xueyuan)
    
        //6.加入分区规则
        val rdd4:RDD[(String,(String,Int))]=rdd3.partitionBy(xueYuanPartitioner).mapPartitions(it =>{
          //将rdd3的结果进行自定义分区,再遍历分区中的元素,并将元素进行toList,再按照访问量排序,
          //在倒叙,再取出第一个元素,返回类型是(String,(String,Int)),不是(String,List[(String,String,Int)])
          it.toList.sortBy(_._2._2).reverse.take(1).iterator
        })
    
        //7.把结果存储
        rdd4.saveAsTextFile("f:/temp/sparkPV案例/partition")
    
        sc.stop()
      }
    }
    class XueYuanParititioner(xy:Array[String]) extends Partitioner{
    
      //自定义规则 学院 分区号
      val rules: mutable.HashMap[String,Int]=new mutable.HashMap[String,Int]()
      var number=0
    
      //遍历学院
      for(i <- xy){
        //学院与分区号对应,rules是一个HashMap,加一个元素
        rules += (i -> number)
        //分区号递增
        number += 1
      }
    
    
      //总的分区个数=学院中的长度为分区个数
      override def numPartitions = xy.length
    
      //拿到分区
      override def getPartition(key: Any):Int = {
          rules.getOrElse(key.toString,0)
      }
    }
    

      

    运行结果:

    案例四:Spark访问数据库

    分组排名第一的学院结果存储在mysql

     

    编写代码如下:

    package day03
    
    import java.net.URL
    
    import java.sql.{Connection, DriverManager}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @author Hunter
      * @version 1.0, 2019年6月21日21:10:30
      *          把最终的结果存储在mysql中
      */
    object UrlGroupCount1 {
      def main(args: Array[String]): Unit = {
        val conf:SparkConf=new SparkConf().setAppName("UrlGroupCount1").setMaster("local[2]")
        val sc:SparkContext=new SparkContext(conf)
    
        //加载数据
        val rdd1:RDD[String] =sc.textFile("itstar.log")
    
        val rdd2:RDD[(String,Int)]=rdd1.map(line =>{
          val s:Array[String]=line.split("	")
          (s(1),1)
        })
    
        //累加求和
        val rdd3:RDD[(String,Int)]=rdd2.reduceByKey(_+_)
        //取出分组的学院
        val rdd4:RDD[(String,Int)]=rdd3.map(x =>{
          val url=x._1
          val host=new URL(url).getHost.split("[.]")(0)
          //元组输出
          (host,x._2)
        })
    
        //6.根据学院分组
        val rdd5:RDD[(String,List[(String,Int)])]=rdd4.groupBy(_._1).mapValues(it => {
          //根据访问量排序 倒序
          it.toList.sortBy(_._2).take(1)
        })
    
        //7.把计算结果保存到mysql中
        rdd5.foreach(x => {
          //把数据写到mysql
          val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/url_count?charatorEncoding=utf-8","root","199902")
          //把spark结果插入到mysql中
          val sql="INSERT INTO url_data (xueyuan,number_one) VALUES (?,?)"
          //执行Sql
          val statement=conn.prepareStatement(sql)
          statement.setString(1,x._1)
          statement.setString(2,x._2.toString())
          statement.executeUpdate()
          statement.close()
          conn.close()
        })
    
        //8.关闭资源 应用停掉
        sc.stop()
      }
    }
    

      

    运行结果:

    案例五:Spark提供jdbcRDD,操作MySQL

    编写代码如下:

    package day03
    
    import java.sql.DriverManager
    
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @author Dawn
      * @version 1.0, 2019年6月22日11:25:30
      *          spark提供的连接mysql的方式
      *          jdbcRDD
      */
    object JdbcRDDDemo {
      def main(args: Array[String]): Unit = {
        val conf:SparkConf=new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
        val sc:SparkContext=new SparkContext(conf)
    
        //匿名函数
        val connection=() =>{
          Class.forName("com.mysql.jdbc.Driver").newInstance()
          DriverManager.getConnection("jdbc:mysql://localhost:3306/url_count?charatorEncoding=utf-8","root","199902")
        }
    
        //查询数据
        val jdbcRdd:JdbcRDD[(Int,String,String)]=new JdbcRDD(
          //指定sparkcontext
          sc,
          connection,
          "SELECT * FROM url_data where uid >= ? AND uid <= ?",
          //2个任务并行
          1,4,2,
          r =>{
          val uid = r.getInt(1)
          val xueyuan = r.getString(2)
          val number_one = r.getString(3)
          (uid, xueyuan, number_one)
        }
        )
    
        val jrdd = jdbcRdd.collect()
        println(jrdd.toBuffer)
        sc.stop()
      }
    }
    

      

    运行结果:

  • 相关阅读:
    ajax 请求登录超时跳转登录页的示例代码
    [WPF]实现密码框的密码绑定
    Linq系列(5)——表达式树之案例应用
    idea设置内存大小
    idea右下角显示使用内存情况
    idea打开Run Dashboard
    java的byte[]与String相互转换
    java有包名的调用没有包名的类,用反射
    【转】查看电脑显卡型号及显卡性能
    idea关闭sonarLint自动扫描
  • 原文地址:https://www.cnblogs.com/hidamowang/p/11144160.html
Copyright © 2011-2022 走看看