zoukankan      html  css  js  c++  java
  • Pair RDD编程

    键值对RDD通常用来进行聚合计算。通过ETL将数据转化为键值对形式。

    PairRDD是很多程序的构成要素,提供了并行操作各个键或跨节点重新进行数据分组的操作接口。

    pair RDD提供reduceByKey()可以分别规约每个键对应的数据,join()把两个RDD中键相同的元素组合到一起。

    Pair RDD创建

    很多存储键值对的数据格式会在读取时直接返回由其键值对数据组成的pari RDD。此外,通过map()可以实现把一个普通RDD转化为pair RDD。

    Pair RDD的转化操作

    • reduceByKey()合并具有相同键的值
    • groupByKey()对具有相同键的值进行分组
    • combineByKey()
    • mapVaulues(func)对每个值应用func
    • flatMapValues(func)
    • keys()返回包含键的RDD
    • values()返回包含值的RDD
    • sortByKey()根据键排序的pair RDD

    聚合操作

    Pair RDD的聚合操作返回RDD,因此是转化操作。不同于RDD的聚合操作是行动操作。

    • combineByKey(),数据流示意图学习
    • reduceByKey()
    • foldBykey()

    每个RDD都有固定数目的分区,分区数决定了RDD执行操作的并行度。

    对于有键的数据,一个常见的用例是将数据根据键进行分组,一个键一个组。对于一个类型K的键和类型V的值组成的RDD得到的结果是[K,Iterable[V]],可以用groupByKey.

    例如,rdd.reduceByKey()和rdd.groupByKey().mapValues()等价,但是前者高效,后者有为每个键存放值的步骤。

    连接

    连接方式多种多样:右外连接、左外连接、交叉连接以及内连接。

    • join表示内连接,两个pair rdd都存在的键才会输出。
    • leftOuterJoin(other)和rightOuterJoin(other)都会根据键连接两个RDD,允许其中一个缺失键。

    行动操作

    • countByKey()对每个键对应的元素分别计数
    • collectAsMap()
    • lookup(key)返回给定键对应的所有值

    数据分区

    Spark程序可以通过控制RDD分区方式来减少通信开销。所有的键值对RDD都可以分区,根据一个针对键的函数对元素进行分区。

    一个简单的scala应用

    //初始化代码,从HDFS读取用户信息
    //userData中的元素根据被读取时的来源,即HDFS块所在的节点来分布
    //Spark此时无法获知某个特定UserID对应的记录位于哪个节点上
    val sc = new SparkContext(...)
    val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
    
    //周期性调用函数来处理过去5分钟产生的日志
    //假设这是一个包含(UserID, LinkInfo)对的SequenceFile
    def processNewLogs(logFileName : String) {
        val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
        val joined = userData.join(events)//RDD of (UserID, (UserInfo,LinkInfo))
        val offTopicVisits = joined.filter {
            case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
        }.count()
        println("number of visit to non-subscribed topics :" + offtopicvisits)
    }
    

    这段代码不够高效,UserID,UserInfo是大表,UserID,LinkInfo是5分钟的日志小文件,调用processNewLogs()都会调用join操作连接他们。默认情况,连接操作会把两个数据集中的所有键的哈希值求出来,将该哈希值相同的记录通过网络传输到同一台机器上,然后再那台机器上对键相同的记录进行连接操作,而一个大表,一个小文件,所以会浪费很多时间,都要对userData表进行跨节点混洗。

    解决方法:在程序开始时,对userData表进行partitionBy()转化操作,把表转为哈希分区,调用join()时,Spark只会对events进行shuffle,将UserID的记录发送到UserData对应分区的节点上,这样网络开销就大大减少。

    减少shuffle操作是提升性能的一个办法,shuffle操作对网络开销很大

    从数据分区中获益的操作有cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()

    分区方式

    • 可自定义Partitioner对象
    • HashPartitioner
    • RangePartitioner
  • 相关阅读:
    如何开始DDD(续)
    如何开始DDD
    ThinkNet终于见面了
    [Umbraco] umbraco中如何分页
    ETL 工具下载全集 包括 Informatica Datastage Cognos( 持续更新)
    js时间对比-转化为几天前,几小时前,几分钟前
    原生JS实现返回顶部和滚动锚点
    JSONP原理及简单实现 可做简单插件使用
    CSS3 transition效果 360度旋转 旋转放大 放大 移动
    js获取url的常用方法
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12403092.html
Copyright © 2011-2022 走看看