zoukankan      html  css  js  c++  java
  • 寒假学习进度5

    今天开始学习spark中RDD算子

    首先是Value类型

    (1)map

    练习代码:

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    //算子(转换)
    val rdd=sparkContext.makeRDD(List(1,2,3,4))

    //转换函数
    // val dataRDD1: RDD[Int] = rdd.map(
    // (num:Int) => {
    // num * 2
    // } )(完整形式)
    val dataRDD1: RDD[Int] = rdd.map(
    num => {
    num * 2
    } )
    // val dataRDD2: RDD[String] = dataRDD1.map(
    // num => {
    // "" + num
    // } )
    dataRDD1.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd=sparkContext.textFile("data/apache.log")

    //apache.log文件中每一行的路径属性取出来,用空格为分隔,将每一行字符串分成成多个组,文件属性在第6
    val mapRDD: RDD[String] = rdd.map(
    line => {
    val datas = line.split(" ")
    datas(6)
    }
    )
    mapRDD.collect().foreach(println)
    sparkContext.stop()
    }

    (2)mapPartitions

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd=sparkContext.makeRDD(List(1,2,3,4),2)
    //性能比.map要好,一次性拿一个分区的数据,进行转换操作,但处理完的数据不会释放掉,因为有对象的引用,容易出现内存溢出。
    val maprdd: RDD[Int] = rdd.mapPartitions(
    iter => {
    iter.map(_ * 2)
    }
    )

    maprdd.collect().foreach(println)
    sparkContext.stop()
    }
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sparkContext = new SparkContext(sparkConf)
    val rdd=sparkContext.makeRDD(List(1,2,3,4),2)
    //计算每个分区中的最大值
    val maprdd: RDD[Int] = rdd.mapPartitions(
    iter => {
    List(iter.max).iterator
    }
    )

    maprdd.collect().foreach(println)
    sparkContext.stop()
    }

    mapPartitions在处理效率上出map要高,但容易出现内存溢出。mapPartitions可以增加或减少数据,map不行。总的来说在内存有限的情况下,使用map较好。

    (3)mapPartitionsWithIndex

    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator1")
    val sparkContext = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 2)

    //将分区后进行编号,然后索引,当为编号为1的分区是返回这个迭代器,输入它
    val maprdd = rdd.mapPartitionsWithIndex(
    (index, iter) => {
    if (index == 1) {
    iter
    } else {
    Nil.iterator
    }

    }
    )

    maprdd.collect().foreach(println)

    sparkContext.stop()
    }
  • 相关阅读:
    windows下搭建solr 6.2.1服务器一
    redis 持久化
    weblogic 启动报错java.net.UnknownHostException
    Tomcat 容器lib下添加 wlfullclient.jar 包引起项目中javax servlet 的冲突
    java.lang.NoSuchMethodError: javax.servlet.ServletContext.getContextPath()Ljava/lang/String;
    spring mvc控制框架的流程及原理1: 总概及源码分析
    CentOS7安装iptables防火墙
    【Intellij IDEA】eclipse项目导入
    weblogic11g 安装参考地址
    解决“只能通过Chrome网上应用商店安装该程序”的方法
  • 原文地址:https://www.cnblogs.com/chenghaixiang/p/15743286.html
Copyright © 2011-2022 走看看