zoukankan      html  css  js  c++  java
  • 寒假学习进度5

    今天开始学习spark中RDD算子

    首先是Value类型

    (1)map

    练习代码:

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    //算子(转换)
    val rdd=sparkContext.makeRDD(List(1,2,3,4))

    //转换函数
    // val dataRDD1: RDD[Int] = rdd.map(
    // (num:Int) => {
    // num * 2
    // } )(完整形式)
    val dataRDD1: RDD[Int] = rdd.map(
    num => {
    num * 2
    } )
    // val dataRDD2: RDD[String] = dataRDD1.map(
    // num => {
    // "" + num
    // } )
    dataRDD1.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd=sparkContext.textFile("data/apache.log")

    //apache.log文件中每一行的路径属性取出来,用空格为分隔,将每一行字符串分成成多个组,文件属性在第6
    val mapRDD: RDD[String] = rdd.map(
    line => {
    val datas = line.split(" ")
    datas(6)
    }
    )
    mapRDD.collect().foreach(println)
    sparkContext.stop()
    }

    (2)mapPartitions

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd=sparkContext.makeRDD(List(1,2,3,4),2)
    //性能比.map要好,一次性拿一个分区的数据,进行转换操作,但处理完的数据不会释放掉,因为有对象的引用,容易出现内存溢出。
    val maprdd: RDD[Int] = rdd.mapPartitions(
    iter => {
    iter.map(_ * 2)
    }
    )

    maprdd.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd=sparkContext.makeRDD(List(1,2,3,4),2)
    //计算每个分区中的最大值
    val maprdd: RDD[Int] = rdd.mapPartitions(
    iter => {
    List(iter.max).iterator
    }
    )

    maprdd.collect().foreach(println)
    sparkContext.stop()
    }

    mapPartitions在处理效率上出map要高,但容易出现内存溢出。mapPartitions可以增加或减少数据,map不行。总的来说在内存有限的情况下,使用map较好。

    (3)mapPartitionsWithIndex

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator1")
    val sparkContext = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 2)

    //将分区后进行编号,然后索引,当为编号为1的分区是返回这个迭代器,输入它
    val maprdd = rdd.mapPartitionsWithIndex(
    (index, iter) => {
    if (index == 1) {
    iter
    } else {
    Nil.iterator
    }

    }
    )

    maprdd.collect().foreach(println)

    sparkContext.stop()
    }
  • 相关阅读:
    站内信DB设计实现
    redis.conf配置详细解析
    MySQL中的锁(表锁、行锁)
    App开放接口api安全性—Token签名sign的设计与实现
    Linux下恢复误删除的文件
    系统资源监控工具
    MySQL性能监控工具-MONyog
    jstack Dump 日志文件中的线程状态
    数据库性能测试方案示例
    [转载]ant和maven的区别
  • 原文地址:https://www.cnblogs.com/chenghaixiang/p/15743286.html
Copyright © 2011-2022 走看看