自己前面的小练习一直都是在linux上面写的,可是最近由于要把他迁移到win上面,我在自己的csdn博客有对如何在win上面搭建spark环境做出说明,好了,我们还是先看看
今天的内容吧
1.假如你有一个文件,如果你想实现以前的mapReduce的操作,这个时候,如果我们使用spark则会变的非常的简单,如果你此时的文件是以" "进行分割的,那我就可以这
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("UrlCount").setMaster("local")
val sc = new SparkContext(conf)
//rdd1将数据进行切分,元祖中放的是(URL,1)
val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{
val f = line.split(" ")
(f(1),1)
})
val rdd2 = rdd1.reduceByKey(_+_)
}
则此时的rdd2,就已经完成了wordCount的操作了
第一个练习(对一个数组进行循环处理)
package cn.wj.test.spark.day03
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by WJ on 2016/12/30.
*/
object ForeachDemo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ForeachDemo2").setMaster("local[3]")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd1.foreach(println(_))
sc.stop()
}
}

2.第二个练习
package cn.wj.spark.day02
import java.net.URL
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by WJ on 2016/12/30.
*/
// 这个是以java来进行排序,如果内存过大,可能会出现溢出的操作
object UrlCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("UrlCount").setMaster("local")
val sc = new SparkContext(conf)
//rdd1将数据进行切分,元祖中放的是(URL,1)
val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{
val f = line.split(" ")
(f(1),1)
})
val rdd2 = rdd1.reduceByKey(_+_)
val rdd3 = rdd2.map(t=>{
val url = t._1
val host = new URL(url).getHost()
(host,url,t._2)
})
// println(rdd2.collect.toBuffer)
//这个的操作是,将rdd4的3以host的进行分组,软后并在每一个分组的情况下,以value中的第三个数据进行排序
//,并且只取前三个的排序
val rdd4 = rdd3.groupBy(_._1).mapValues(it =>{
it.toList.sortBy(_._3).reverse.take(3)
})
println(rdd4.collect().toBuffer)
}
}

第三个练习
package cn.wj.test.spark.day03
import org.apache.spark.{SparkConf, SparkContext}
import java.net.URL
/**
* Created by WJ on 2016/12/31.
*/
object AddUrlCount3 {
val arr = Array("java.itcast.cn","php.itcast.cn","net.itcast.cn")
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AppUrlCount3").setMaster("local")
val sc = new SparkContext(conf)
// val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{
// val f = line.split(" ")
// (f(1),1)
// })
val rdd1 = sc.textFile("E://Test/itcast.log").map( line =>{
val f = line.split(" ")
(f(1),1)
})
val rdd2 = rdd1.reduceByKey(_+_)
val rdd3 = rdd2.map(t=>{
val url = t._1
val host = new URL(url).getHost()
(host,url,t._2)
})
for(ins <- arr){
val rdd = rdd3.filter(_._1==ins)
val result = rdd.sortBy(_._3,false).take(3)
println(result.toBuffer)
}
sc.stop()
}
}
