zoukankan      html  css  js  c++  java
  • Spark学习之JavaRdd

    RDD 介绍

    RDD,全称Resilient Distributed Datasets(弹性分布式数据集),是Spark最为核心的概念,是Spark对数据的抽象。RDD是分布式的元素集合,每个RDD只支持读操作,且每个RDD都被分为多个分区存储到集群的不同节点上。除此之外,RDD还允许用户显示的指定数据存储到内存和磁盘中,掌握了RDD编程是SPARK开发的第一步。

     

    1:创建操作(creation operation):RDD的创建由SparkContext来负责。
    2:转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD。
    3:行动操作(action operation):Spark为惰性计算,对RDD的行动操作都会触发Spark作业的运行
    4:控制操作(control operation):对RDD进行持久化等。

     

    DEMO代码地址:https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/rdddemo/OneRDD.java

    一:创建操作

    创建RDD有两种方式:
    1 读取一个数据集(SparkContext.textFile()) :

    JavaDStreamlines=jssc.textFileStream("/Users/huipeizhu/Documents/sparkdata/input/"); 
    JavaReceiverInputDStreamlines = jssc.socketTextStream("localhost", 9999);


    2 读取一个集合(SparkContext.parallelize()) :

    Listlist = Arrays.asList(5, 4, 3, 2, 1);
    JavaRDDrdd = sc.parallelize(list);

    二:转换操作

    1:单个RDD转换操作
    map() : 对每个元素进行操作,返回一个新的RDD
    System.out.println("RDD每个元素乘10:" + rdd.map(v -> v * 10)


    filter() : 最每个元素进行筛选,返回符合条件的元素组成的一个新RDD
    System.out.println("RDD去掉1的元素:" + rdd.filter(v -> v != 1));

    flatMap() : 对每个元素进行操作,将返回的迭代器的所有元素组成一个新的RDD返回
    r.dd.flatMap(x -> x.to(3)).collect()

    distinct():去重操作
    System.out.println("RDD去重操作:" + rdd.distinct());

    rdd最大和最小值

    Integer max=  rdd.reduce((v1, v2) -> Math.max(v1, v2));

    Integer min=  rdd.reduce((v1, v2) -> Math.min(v1, v2))


    2:两个RDD的转化操作:


    [1, 2, 3] [3, 4, 5] 两个个RDD简单相关操作

    union() :合并,不去重
    System.out.println("两个RDD集合:" + rdd1.union(rdd2).collect());

    intersection() :交集
    System.out.println("两个RDD集合共同元素:" + rdd1.intersection(rdd2).collect());

    cartesian() :笛卡儿积
    System.out.println("和另外一个RDD集合的笛卡尔积:" + rdd1.cartesian(rdd2).collect());

    subtract() : 移除相同的内容
    rdd1.subtract(rdd2).collect()

     

    三:行动操作


    collect() :返回所有元素
    System.out.println("原始数据:" + rdd.collect());

    count() :返回元素个数
    System.out.println("统计RDD的所有元素:" + rdd.count());

    countByValue() : 各个元素出现的次数
    System.out.println("每个元素出现的次数:" + rdd.countByValue());

    take(num) : 返回num个元素
    System.out.println("取出rdd返回2个元素:" + rdd.take(2));

    top(num) : 返回前num个元素
    System.out.println("取出rdd返回最前2个元素:" + rdd.top(2));


    reduce(func) :并行整合RDD中的所有数据(最常用的)
    System.out.println("整合RDD中所有数据(sum):" + rdd.reduce((v1, v2) -> v1 + v2));

    foreach(func):对每个元素使用func
    rdd.foreach(t -> System.out.print(t));


    四:控制操作


    cache():

    persist():保留着RDD的依赖关系

    checkpoint(level:StorageLevel):RDD[T]切断RDD依赖关系

    所谓的控制操作就是持久化
    你能通过persist()或者cache()方法持久化一个rdd。首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术-如果RDD的任何一个分区丢失,它 可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。
    此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。
    Spark自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用RDD.unpersist()方法。
    在实际操作当中我们可以借助第三方进行数据持久化 如:redis

  • 相关阅读:
    MySQL数据模型
    Spring循环依赖
    @Autowired和@Resource区别
    Kafka概念
    阻塞队列
    线程池原理
    Spring AOP
    JVM 史上最最最完整深入解析(12000 字噢)
    Dubbo配置信息
    友情链接
  • 原文地址:https://www.cnblogs.com/diaozhaojian/p/9152530.html
Copyright © 2011-2022 走看看