zoukankan      html  css  js  c++  java
  • 寒假学习进度6

    今天继续学习sparkRDD的算子

    (1)flatMap

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
    val sparkContext = new SparkContext(sparkConf)

    val rdd: RDD[List[Int]]= sparkContext.makeRDD(List(List(1, 2), List(3, 4)))
    //flatmap,List变成Int
    //使用flatmap进行扁平化处理,将List集合里数据进行拆分
    val flatrdd: RDD[Int] = rdd.flatMap(
    list => {
    list //讲拆分的数据进行封装成一个LIst
    }
    )
    flatrdd.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
    val sparkContext = new SparkContext(sparkConf)

    val rdd: RDD[String]= sparkContext.makeRDD(List("hello word","hello spark"))
    //flatmap
    //使用flatmap进行扁平化处理,将List集合里数据进行拆分,用空格做分隔符
    val flatrdd: RDD[String] = rdd.flatMap(
    s => {
    s.split(" ")
    }
    )
    flatrdd.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
    val sparkContext = new SparkContext(sparkConf)

    val rdd= sparkContext.makeRDD(List(List(1, 2), 3,List(4, 5)))
    //flatmap
    //因为list集合里类型不一致,所以使用模式匹配的方式,讲不是集合的封装成一个集合
    val flatrdd: RDD[Any] = rdd.flatMap(
    data => {
    data match {
    case list: List[_] => list
    case data => List(data)

    }
    }
    )
    flatrdd.collect().foreach(println)
    sparkContext.stop()
    }
     

    (2)glom

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    //Int变成Array
    val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

    val glomrdd: RDD[Array[Int]] = rdd.glom()

    glomrdd.collect().foreach(data=>println(data.mkString(",")))
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    //Int变成Array
    val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
    val glomrdd: RDD[Array[Int]] = rdd.glom()

    //2个分区数组数据(Array)用map中的max求每个分区中最大值
    val maxRdd: RDD[Int] = glomrdd.map(
    array => {
    array.max
    }
    )

    //maxRdd 2个分区数组采集求和
    println(maxRdd.collect().sum)
    sparkContext.stop()
    }

    (3)groupBy

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
    def groupFunction(num:Int)={
    num%2
    }

    val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
    groupRDD.collect().foreach(println)
    sparkContext.stop()
    }


    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd: RDD[String] = sparkContext.makeRDD(List("hello","spark","hi","sss"), 2)

    val grouprdd: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
    grouprdd.collect().foreach(println)

    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)

    //读取apache.log文件
    val rdd=sparkContext.textFile("data/apache.log")

    //取数据中每小时的点击量
    val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
    line => {
    //将每行数据以空格为分割,分成多个字符串
    val data = line.split(" ")
    //取第4个字符串
    val time = data(3)

    //转换格式
    val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
    //解析time
    val datas= sdf.parse(time)
    //小时字符
    val sdf1 = new SimpleDateFormat("HH")
    //格式化字符
    val hour = sdf1.format(datas)
    (hour, 1)//比如08小时出现一次计1
    }
    ).groupBy(_._1)

    timeRDD.map{
    //模式匹配
    case (hour,iter)=>{
    (hour,iter.size)
    }
    }.collect().foreach(println)

    sparkContext.stop()
    }

    (4)filter

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    //filter,根据符合规则的数据筛选
    val rdd= sc.makeRDD(List(1,2,3,4), 2)
    val fliterrdd: RDD[Int] = rdd.filter(
    num => num % 2 != 0
    )
    fliterrdd.collect().foreach(println)
    sc.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    //filter,根据符合规则的数据筛选
    val rdd=sc.textFile("data/apache.log")
    rdd.filter(
    line=>{
    //将每行数据以空格为分割,分成多个字符串
    val data = line.split(" ")
    //取第4个字符串
    val time = data(3)
    time.startsWith("17/05/2015")
    }
    ).collect().foreach(println)

    sc.stop()
    }
  • 相关阅读:
    截取url中最后斜杆的文件名
    html span从上往下,从左往右排序
    浪漫源码记录
    微信小程序TypeError: Cannot read property 'elem' of undefined
    tomcat8 性能优化
    Bandicam神奇使用法
    DataStage 七、在DS中使用配置文件分配资源
    DataStage 六、安装和部署集群环境
    DataStage 错误集(持续更新)
    DataStage 三、配置ODBC
  • 原文地址:https://www.cnblogs.com/chenghaixiang/p/15754615.html
Copyright © 2011-2022 走看看