zoukankan      html  css  js  c++  java
  • Spark练习代码

    1、scalaWordCount

    package com._51doit.spark.day1

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    object ScalaWorldCount {

      def main(args: Array[String]): Unit = {

        //第一步:创建SparkContext
        val conf: SparkConf = new SparkConf().setAppName("ScalaWorldCount")
        val sc = new SparkContext(conf)

        //第二步,指定以后从hdfs中读取数据创建RDD(神奇的大集合)
        //正确但是不建议sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile(args(1))
        val lines: RDD[String] = sc.textFile(args(0))

        //切分压平
        val words: RDD[String] = lines.flatMap(_.split(""))

        //将单词和1组合在一起
        val wordAndOne: RDD[(String, Int)] = words.map((_,1))

        //聚合
        val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_,1)

        //排序
        val sorted: RDD[(String, Int)] = reduced.sortBy(_._2,false)

        //将数据保存到HDFS中
        sorted.saveAsTextFile(args(1))

        //最后释放资源
        sc.stop()

      }
    }

    2、FavTeacherInSubject

    package com._51doit.spark.day2

    import java.net.URL

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    object FavTeacherInSubject {

    def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)

    val sc = new SparkContext(conf)

    val lines: RDD[String] = sc.textFile(args(0))

    //处理数据
    val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
    val teacher = line.substring(line.lastIndexOf("/") + 1)
    val url = new URL(line)
    val host = url.getHost
    val subject = host.substring(0, host.indexOf("."))
    ((subject, teacher), 1)
    })

    //聚合
    val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_+_)

    //按照学科进行分组
    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)

    //组内排序
    val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))

    //把数据保存到hdfs
    sorted.saveAsTextFile(args(1))

    sc.stop()

    }
    }

    3、(根据IP规则,计算用户的地区分布数量)

    package com._51doit.spark.day3

    import com._51doit.spark.utils.MyUtil
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    import scala.collection.mutable.ArrayBuffer

    object IpLocation {


    def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)

    val sc = new SparkContext(conf)

    //指定以后从哪里读取数据创建RDD
    val accessLog: RDD[String] = sc.textFile(args(0))

    //对数据进行处理
    val provinceAndOne: RDD[(String, Int)] = accessLog.map(line => {
    val fields = line.split("[|]")
    val ip = fields(1)
    val ipNum = MyUtil.ip2Long(ip)
    val province = IpRulesUtil.binarySearch(ipNum)
    (province, 1)
    })

    //聚合
    val reudced: RDD[(String, Int)] = provinceAndOne.reduceByKey(_+_)

    reudced.saveAsTextFile(args(1))

    sc.stop()
    }

    }

    package com._51doit.spark.day3

    import java.io.{BufferedReader, InputStreamReader}
    import java.net.URI

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}

    import scala.collection.mutable.ArrayBuffer

    object IpRulesUtil {

    //初始化一个集合
    val ipRules = new ArrayBuffer[(Long, Long, String)]()

    //hdfs的读取数据的过程
    val conf = new Configuration()
    val fs: FileSystem = FileSystem.get(URI.create("hdfs://node-1.51doit.com:9000"), conf)
    val in: FSDataInputStream = fs.open(new Path("/iprules/ip.txt"))
    val reader = new BufferedReader(new InputStreamReader(in))

    var line = reader.readLine()

    while (line != null) {
    val fields = line.split("[|]")
    val startNum = fields(2).toLong
    val endNum = fields(3).toLong
    val province = fields(6)
    ipRules.append((startNum, endNum, province))

    line = reader.readLine()
    }

    def binarySearch(ip: Long) : String = {
    var low = 0
    var high = ipRules.length - 1
    while (low <= high) {
    val middle = (low + high) / 2
    if ((ip >= ipRules(middle)._1) && (ip <= ipRules(middle)._2))
    return ipRules(middle)._3
    if (ip < ipRules(middle)._1)
    high = middle - 1
    else {
    low = middle + 1
    }
    }
    "未知"
    }
    }


  • 相关阅读:
    动态规划
    关键路径
    拓扑排序
    最小生成树
    Floyd 多源最短路径
    SPFA算法
    Bellman_Ford算法(负环的单源路径)
    Dijkstra算法
    fill和memset的区别
    Codeforces Round #655 (Div. 2) 题解
  • 原文地址:https://www.cnblogs.com/beiyi888/p/9803207.html
Copyright © 2011-2022 走看看