zoukankan      html  css  js  c++  java
  • 寒假学习进度6

    今天继续学习sparkRDD的算子

    (1)flatMap

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
    val sparkContext = new SparkContext(sparkConf)

    val rdd: RDD[List[Int]]= sparkContext.makeRDD(List(List(1, 2), List(3, 4)))
    //flatmap,List变成Int
    //使用flatmap进行扁平化处理,将List集合里数据进行拆分
    val flatrdd: RDD[Int] = rdd.flatMap(
    list => {
    list //讲拆分的数据进行封装成一个LIst
    }
    )
    flatrdd.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
    val sparkContext = new SparkContext(sparkConf)

    val rdd: RDD[String]= sparkContext.makeRDD(List("hello word","hello spark"))
    //flatmap
    //使用flatmap进行扁平化处理,将List集合里数据进行拆分,用空格做分隔符
    val flatrdd: RDD[String] = rdd.flatMap(
    s => {
    s.split(" ")
    }
    )
    flatrdd.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
    val sparkContext = new SparkContext(sparkConf)

    val rdd= sparkContext.makeRDD(List(List(1, 2), 3,List(4, 5)))
    //flatmap
    //因为list集合里类型不一致,所以使用模式匹配的方式,讲不是集合的封装成一个集合
    val flatrdd: RDD[Any] = rdd.flatMap(
    data => {
    data match {
    case list: List[_] => list
    case data => List(data)

    }
    }
    )
    flatrdd.collect().foreach(println)
    sparkContext.stop()
    }
     

    (2)glom

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    //Int变成Array
    val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

    val glomrdd: RDD[Array[Int]] = rdd.glom()

    glomrdd.collect().foreach(data=>println(data.mkString(",")))
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    //Int变成Array
    val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
    val glomrdd: RDD[Array[Int]] = rdd.glom()

    //2个分区数组数据(Array)用map中的max求每个分区中最大值
    val maxRdd: RDD[Int] = glomrdd.map(
    array => {
    array.max
    }
    )

    //maxRdd 2个分区数组采集求和
    println(maxRdd.collect().sum)
    sparkContext.stop()
    }

    (3)groupBy

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
    def groupFunction(num:Int)={
    num%2
    }

    val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
    groupRDD.collect().foreach(println)
    sparkContext.stop()
    }


    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd: RDD[String] = sparkContext.makeRDD(List("hello","spark","hi","sss"), 2)

    val grouprdd: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
    grouprdd.collect().foreach(println)

    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)

    //读取apache.log文件
    val rdd=sparkContext.textFile("data/apache.log")

    //取数据中每小时的点击量
    val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
    line => {
    //将每行数据以空格为分割,分成多个字符串
    val data = line.split(" ")
    //取第4个字符串
    val time = data(3)

    //转换格式
    val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
    //解析time
    val datas= sdf.parse(time)
    //小时字符
    val sdf1 = new SimpleDateFormat("HH")
    //格式化字符
    val hour = sdf1.format(datas)
    (hour, 1)//比如08小时出现一次计1
    }
    ).groupBy(_._1)

    timeRDD.map{
    //模式匹配
    case (hour,iter)=>{
    (hour,iter.size)
    }
    }.collect().foreach(println)

    sparkContext.stop()
    }

    (4)filter

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    //filter,根据符合规则的数据筛选
    val rdd= sc.makeRDD(List(1,2,3,4), 2)
    val fliterrdd: RDD[Int] = rdd.filter(
    num => num % 2 != 0
    )
    fliterrdd.collect().foreach(println)
    sc.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    //filter,根据符合规则的数据筛选
    val rdd=sc.textFile("data/apache.log")
    rdd.filter(
    line=>{
    //将每行数据以空格为分割,分成多个字符串
    val data = line.split(" ")
    //取第4个字符串
    val time = data(3)
    time.startsWith("17/05/2015")
    }
    ).collect().foreach(println)

    sc.stop()
    }
  • 相关阅读:
    hadoop shell 命令
    java正则提取括号中的关键词
    java使用ac算法实现关键词高亮
    mysql事务级别和spring中应用
    elasticsearch java工具类
    【记录】研究生周练题目清单
    【记录】研究生已阅文献清单
    论文阅读(11)RoBERTa: A Robustly Optimized BERT Pretraining Approach(2019)
    论文阅读(10)Shallow Convolutional Neural Network for Implicit Discourse Relation Recognition
    论文阅读(9)Towards Cross-Domain PDTB-Style Discourse Parsing(2014)
  • 原文地址:https://www.cnblogs.com/chenghaixiang/p/15754615.html
Copyright © 2011-2022 走看看