案例一:计算网页访问量前三名
源数据大致预览:

编写Scala代码:
package day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author dawn
* @version 1.0, 2019年6月21日11:40:16
*
* 需求:计算网页访问量前三名
* 用户:喜欢视频 直播
* 帮助企业做经营和决策
*
* 看数据
*/
object UrlCount {
def main(args: Array[String]): Unit = {
//1.加载数据
val conf: SparkConf = new SparkConf().setAppName("UrlCount").setMaster("local[2]")
//Spark程序入口
val sc: SparkContext = new SparkContext(conf)
//加载数据
val rdd1: RDD[String] = sc.textFile("itstar.log")
//2.对数据进行计算
val rdd2:RDD[(String,Int)]=rdd1.map(line => {
val s = line.split(" ")
//标注出现1次
(s(1), 1)
})
//3.将相同的网址进行累加求和 网页,201
val rdd3:RDD[(String,Int)] = rdd2.reduceByKey(_+_)
//4.排序 取出前三
val rdd4:Array[(String,Int)] = rdd3.sortBy(_._2,false).take(3)
//5.遍历打印
rdd3.foreach(x => {
println("网址为:"+x._1+"访问量为:"+x._2)
})
//6.转换 toString toBuffer
println(rdd4.toBuffer)
sc.stop()
}
}
运行结果:

案例二:求出每个学院 访问第一位的网址,分组
编写Scala代码:
package day02
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author dawn
* @version 1.0, 2019年6月21日12:25:44
* 需求:求出每个学院 访问第一位的网址,分组
* bigdata:video(直播)
* java:video
* python:teacher
*/
object UrlGoupCount {
def main(args: Array[String]): Unit = {
//1.创建SparkConText
val conf:SparkConf=new SparkConf().setAppName("UrlGoupCount").setMaster("local[2]")
val sc:SparkContext=new SparkContext(conf)
//2.加载数据
val rdd1:RDD[String]=sc.textFile("itstar.log")
//3.切分
val rdd2:RDD[(String,Int)] = rdd1.map(line =>{
val s=line.split(" ")
(s(1),1)
})
//4.求出总的访问量 网址,总的访问量
val rdd3:RDD[(String,Int)] = rdd2.reduceByKey((x,y) => x+y )
//5.取出学院
val rdd4:RDD[(String,String,Int)] = rdd3.map(x =>{
//拿到url
val url:String=x._1
//用java的方式拿到主机名
val host:String =new URL(url).getHost.split("[.]")(0)
//元组输出
(host,url,x._2)
})
//6.按照学院进行分组 groupBy返回的结果是:RDD[(k,iterator[(String,Stirng,Int)])]
val rdd5:RDD[(String,List[(String,String,Int)])]=rdd4.groupBy(_._1).mapValues(it => {
//倒序,记得要将iterator[(String,Stirng,Int)]转成List再排序
it.toList.sortBy(_._3).reverse.take(1)
})
//7.遍历打印
rdd5.foreach(x =>{
println("学院为:"+x._1+"||"+"访问量第一的是:"+x._2)
})
sc.stop()
}
}
运行结果:

案例三:加入自定义分区 按照学院分区,相同的学院分为一个结果文件
编写Scala代码:
package day02
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* @author Dawn
* @version 1.0, 2019年6月21日12:25:49
* 需求:加入自定义分区
* 按照学院分区,相同的学院分为一个结果文件
*/
object UrlParCount {
def main(args: Array[String]): Unit = {
//1.创建SparkContext对象
val conf:SparkConf=new SparkConf().setAppName("").setMaster("local[2]")
val sc:SparkContext=new SparkContext(conf)
//2.加载数据
val rdd1:RDD[(String,Int)] = sc.textFile("itstar.log").map(line => {
val s=line.split(" ")
(s(1),1)
})
//3.聚合
val rdd2:RDD[(String,Int)] = rdd1.reduceByKey(_+_)
//4.自定义格式
val rdd3:RDD[(String,(String,Int))]=rdd2.map(t =>{
val url=t._1
val host=new URL(url).getHost
val XHost=host.split("[.]")(0)
//元组输出
(XHost,(url,t._2))
})
//5.加入自定义分区
val xueyuan:Array[String] = rdd3.map(_._1).distinct().collect()//去重只剩下net bigdata java
val xueYuanPartitioner:XueYuanParititioner=new XueYuanParititioner(xueyuan)
//6.加入分区规则
val rdd4:RDD[(String,(String,Int))]=rdd3.partitionBy(xueYuanPartitioner).mapPartitions(it =>{
//将rdd3的结果进行自定义分区,再遍历分区中的元素,并将元素进行toList,再按照访问量排序,
//在倒叙,再取出第一个元素,返回类型是(String,(String,Int)),不是(String,List[(String,String,Int)])
it.toList.sortBy(_._2._2).reverse.take(1).iterator
})
//7.把结果存储
rdd4.saveAsTextFile("f:/temp/sparkPV案例/partition")
sc.stop()
}
}
class XueYuanParititioner(xy:Array[String]) extends Partitioner{
//自定义规则 学院 分区号
val rules: mutable.HashMap[String,Int]=new mutable.HashMap[String,Int]()
var number=0
//遍历学院
for(i <- xy){
//学院与分区号对应,rules是一个HashMap,加一个元素
rules += (i -> number)
//分区号递增
number += 1
}
//总的分区个数=学院中的长度为分区个数
override def numPartitions = xy.length
//拿到分区
override def getPartition(key: Any):Int = {
rules.getOrElse(key.toString,0)
}
}
运行结果:

案例四:Spark访问数据库
把分组排名第一的学院结果存储在mysql中,
编写代码如下:
package day03
import java.net.URL
import java.sql.{Connection, DriverManager}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author Hunter
* @version 1.0, 2019年6月21日21:10:30
* 把最终的结果存储在mysql中
*/
object UrlGroupCount1 {
def main(args: Array[String]): Unit = {
val conf:SparkConf=new SparkConf().setAppName("UrlGroupCount1").setMaster("local[2]")
val sc:SparkContext=new SparkContext(conf)
//加载数据
val rdd1:RDD[String] =sc.textFile("itstar.log")
val rdd2:RDD[(String,Int)]=rdd1.map(line =>{
val s:Array[String]=line.split(" ")
(s(1),1)
})
//累加求和
val rdd3:RDD[(String,Int)]=rdd2.reduceByKey(_+_)
//取出分组的学院
val rdd4:RDD[(String,Int)]=rdd3.map(x =>{
val url=x._1
val host=new URL(url).getHost.split("[.]")(0)
//元组输出
(host,x._2)
})
//6.根据学院分组
val rdd5:RDD[(String,List[(String,Int)])]=rdd4.groupBy(_._1).mapValues(it => {
//根据访问量排序 倒序
it.toList.sortBy(_._2).take(1)
})
//7.把计算结果保存到mysql中
rdd5.foreach(x => {
//把数据写到mysql
val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/url_count?charatorEncoding=utf-8","root","199902")
//把spark结果插入到mysql中
val sql="INSERT INTO url_data (xueyuan,number_one) VALUES (?,?)"
//执行Sql
val statement=conn.prepareStatement(sql)
statement.setString(1,x._1)
statement.setString(2,x._2.toString())
statement.executeUpdate()
statement.close()
conn.close()
})
//8.关闭资源 应用停掉
sc.stop()
}
}
运行结果:

案例五:Spark提供jdbcRDD,操作MySQL
编写代码如下:
package day03
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author Dawn
* @version 1.0, 2019年6月22日11:25:30
* spark提供的连接mysql的方式
* jdbcRDD
*/
object JdbcRDDDemo {
def main(args: Array[String]): Unit = {
val conf:SparkConf=new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
val sc:SparkContext=new SparkContext(conf)
//匿名函数
val connection=() =>{
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/url_count?charatorEncoding=utf-8","root","199902")
}
//查询数据
val jdbcRdd:JdbcRDD[(Int,String,String)]=new JdbcRDD(
//指定sparkcontext
sc,
connection,
"SELECT * FROM url_data where uid >= ? AND uid <= ?",
//2个任务并行
1,4,2,
r =>{
val uid = r.getInt(1)
val xueyuan = r.getString(2)
val number_one = r.getString(3)
(uid, xueyuan, number_one)
}
)
val jrdd = jdbcRdd.collect()
println(jrdd.toBuffer)
sc.stop()
}
}
运行结果:
