zoukankan      html  css  js  c++  java
  • spark-------------RDD 转换算子-----value类型(一)

    引言

    RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型,本文主要讲一些Value 类型

    正文

    资源路径和资源内容

    map

    • 函数签名:def map[U: ClassTag](f: T => U): RDD[U]

    • 函数说明:将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

    案例:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_Map {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
    
        val rdd: RDD[String] = sc.textFile("datas/apache.log")
        val value: RDD[String] = rdd.map({
          line => {
            val address: String = line.split(" ")(6)
            address
          }
        })
        value.collect().foreach(println)
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行结果:

    mapPartitions

    • 函数签名:def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

    • 函数说明:将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

    案例:获取每个数据分区的最大值

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark02_RDD_Operator_MapPartitions {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
          // 求每个分区的最大值
          val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
        val value: RDD[Int] = rdd.mapPartitions(
          iter => {
            List(iter.max).iterator
          }
        )
          value.collect().foreach(println)
            // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行结果

    mapPartitionsWithIndex

    • 函数签名:def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

    • 函数说明:将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

    案例:打印数据和以及所在的分区

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_MapPartitionsWithIndex {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
          val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
          val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
            (index, iter) => {
              iter.map(
                num => {
                  (index, num)
                }
              )
            }
          )
          value.collect().foreach(println)
          // TODO 关闭环境
          sc.stop()
      }
    }
    
    

    运行截图

    flatMap

    • 函数签名:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

    • 函数说明:将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

    案例:将 List(List(1,2),3,List(4,5))进行扁平化操作

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_FlatMap {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
          val rdd: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
          val value: RDD[Any] = rdd.flatMap(
            // 用模式匹配
            data => {
              data match {
                case list: List[_] => list
                case dat => List(dat)
              }
            }
          )
        value.collect().foreach(println)
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行截图

    glom

    • 函数签名:def glom(): RDD[Array[T]]

    • 函数说明:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

    案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_Glom {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
          // 【1,2】【3,4】
          // 【2】【4】
          // 6
          val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
          // 变成数组
          val glomRDD: RDD[Array[Int]] = rdd.glom()
          // 取最大值
          val maxRDD: RDD[Int] = glomRDD.map(
            array => {
              array.max
            }
          )
        println(maxRDD.collect().sum)
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行截图:

    groupBy

    • 函数签名:def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

    • 函数说明:将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中

    案例:

    • 从服务器日志数据 apache.log 中获取每个时间段访问量
    • 将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
    package com.xiao.spark.core.rdd.operator.tranform
    
    import java.text.SimpleDateFormat
    import java.util.Date
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_GroupBy {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
        // 按照首字母进行分组
    //    val rdd: RDD[String] = sc.makeRDD(List("Hadoop","Spark","Scala","Hive"))
    //
    //    val value: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
    
        // 从服务器日志数据 apache.log 中获取每个时间段访问量
        val rdd: RDD[String] = sc.textFile("datas/apache.log")
        val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
          line => {
            val time: String = line.split(" ")(3)
            val sdf = new SimpleDateFormat("dd/MM/yy:HH:mm:ss")
            val date: Date = sdf.parse(time)
            val sdf1 = new SimpleDateFormat("HH")
            val hour: String = sdf1.format(date)
            (hour, 1)
          }
        ).groupBy(_._1) // _._1 获取元组第一个元素  ._1:取第一个元素
        val value: RDD[(String, Int)] = timeRDD.map {
          case (hour, iter) => (hour, iter.size)
        }
        value.collect().foreach(println)
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行截图:


    filter

    • 函数签名:def filter(f: T => Boolean): RDD[T]

    • 函数说明:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

    案例:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_Filter {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
    
        val rdd: RDD[String] = sc.textFile("datas/apache.log")
        val timeRDD: RDD[String] = rdd.filter(
          line => {
            val time: String = line.split(" ")(3)
            time.startsWith("17/05/2015")
          }
        )
        timeRDD.collect().foreach(println)
          // TODO 关闭环境
          sc.stop()
      }
    }
    
    

    运行结果:

  • 相关阅读:
    SuperSocket框架中BinaryRequestInfo协议的使用
    UIImageView学习笔记
    UITextField学习小结
    Java数据结构相关类的实现原理
    Android 中把尺寸转换方法
    Win8 & WP8.1 轻型数据库
    隐私策略
    Windows 10(UWP)开发技巧
    【UWP】FFmpeg库的编译
    【UWP】拖拽列表项的排序功能实现
  • 原文地址:https://www.cnblogs.com/yangxiao-/p/14326296.html
Copyright © 2011-2022 走看看