zoukankan      html  css  js  c++  java
  • Spark RDD API扩展开发

    原文链接:

    Spark RDD API扩展开发(1)

    Spark RDD API扩展开发(2):自定义RDD

    我们都知道,Apache Spark内置了很多操作数据的API。但是很多时候,当我们在现实中开发应用程序的时候,我们需要解决现实中遇到的问题,而这些问题可能在Spark中没有相应的API提供,这时候,我们就需要通过扩展Spark API来实现我们自己的方法。
    我们可以通过两种方法来扩展Spark API,(1)、其中一种就是在现有的RDD中添加自定义的方法;(2)、第二种就是创建属于我们自己的RDD。在这篇文章中,我将对这两种方法进行阐述,并赋予代码 。下面我就开始介绍第一种方法。

      假如我们中有一些商品的销售数据,数据的格式是CSV的。为了简单起见,假如每行数据都是由id, customerId, itemId 以及itemValue四个字段组成,我们用SalesRecord来表示:

    1 class SalesRecord(val id: String,
    2                   val customerId: String,
    3                   val itemId: String,
    4                   val itemValue: Double) extends Comparable[SalesRecord]
    5 with Serializable

      所以我们可以将商品的销售数据进行解析,并存储到RDD[SalesRecord]中:

    01 /**
    02  * User: 过往记忆
    03  * Date: 15-03-31
    04  * Time: 上午00:24
    05  * bolg: http://www.iteblog.com
    06  * 本文地址:http://www.iteblog.com/archives/1298
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10  
    11 val sc = new SparkContext(args(0), "iteblogRDDExtending")
    12 val dataRDD = sc.textFile("file:///www/iteblog.csv")
    13 val salesRecordRDD = dataRDD.map(row => {
    14     val colValues = row.split(",")
    15     new SalesRecord(colValues(0),colValues(1),
    16     colValues(2),colValues(3).toDouble)
    17 })

      如果我们想计算出这些商品的总销售额,我们会这么来写:

    1 salesRecordRDD.map(_.itemValue).sum

      虽然这看起来很简洁,但是理解起来却有点困难。但是如果我们可以这么来写,可能会很好理解:

    1 salesRecordRDD.totalSales

      在上面的代码片段中,totalSales方法让我们感觉就是Spark内置的操作一样,但是Spark是不提供这个方法的,我们需要在现有的RDD中实现我们自定义的操作。

      下面我就来介绍一些如何在现有的RDD中添加我们自定义的方法。

      一、定义一个工具类,来存放我们所有自定义的操作

      当然,你完全没必要自定义一个类类添加我们自定义的方法,但是为了管理,还是建议你这么做。下面我们来定义IteblogCustomFunctions类,它存储所有我们自定义的方法。它是专门用来处理RDD[SalesRecord],所以这个类中提供的操作全部是用来处理销售数据的:

    1 class IteblogCustomFunctions(rdd:RDD[SalesRecord]) {
    2   def totalSales =rdd.map(_.itemValue).sum 
    3 }

      二、隐形转换来实现在RDD中添加方法

      我们定义了隐形的addIteblogCustomFunctions函数,这可以将所有操作销售数据的方法作用于RDD[SalesRecord]上:

    01 /**
    02  * User: 过往记忆
    03  * Date: 15-03-31
    04  * Time: 上午00:24
    05  * bolg: http://www.iteblog.com
    06  * 本文地址:http://www.iteblog.com/archives/1298
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10  
    11 object IteblogCustomFunctions {
    12   implicit def addIteblogCustomFunctions(rdd: RDD[SalesRecord]) = new
    13   IteblogCustomFunctions(rdd)
    14 }

      三、使用自定义的方法

      下面方法通过导入IteblogCustomFunctions 中的相应方法来实现使用我们自定义的方法:

    1 import IteblogCustomFunctions._
    2 println(salesRecordRDD.totalSales)

      通过上面三步我们就可以在现有的RDD中添加我们自定义的方法。

    自定义一个RDD类

    在上文中我介绍了如何在现有的RDD中添加自定义的函数。本文将介绍如何自定义一个RDD类,假如我们想对没见商品进行打折,我们想用Action操作来实现这个操作,下面我将定义IteblogDiscountRDD类来计算商品的打折,步骤如下:

      一、创建IteblogDiscountRDD类

      自定义RDD类需要继承Spark中的RDD类,并实现其中的方法:

    01 /**
    02  * User: 过往记忆
    03  * Date: 15-04-01
    04  * Time: 上午00:59
    05  * bolg: http://www.iteblog.com
    06  * 本文地址:http://www.iteblog.com/archives/1299
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10 classIteblogDiscountRDD(prev:RDD[SalesRecord],xxxxx:Double)
    11     extends RDD[SalesRecord](prev){
    12  
    13 //继承compute方法
    14 override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] =  {
    15   firstParent[SalesRecord].iterator(split, context).map(salesRecord => {
    16       val discount = salesRecord.itemValue*discountPercentage
    17       new SalesRecord(salesRecord.id,
    18       salesRecord.customerId,salesRecord.itemId,discount)
    19 })}
    20  
    21 //继承getPartitions方法
    22 override protected def getPartitions: Array[Partition] =
    23       firstParent[SalesRecord].partitions
    24 }

      上面代码中,我创建了一个IteblogDiscountRDD类,这个RDD只操纵销售数据,当我们继承RDD类时,我们必须重载两个方法:
      compute

      这个函数是用来计算RDD中每个的分区的数据,在我代码中,我们输入了销售数据,并对其中的数据计算打折计算。

      getPartitions
      
      getPartitions函数允许开发者为RDD定义新的分区,在我们的代码中,并没有改变RDD的分区,重用了父RDD的分区。

      定义IteblogDiscountRDD的时候将类型写死了(SalesRecord),它只能用来处理SalesRecord数据。如果我们想定义一个通用的RDD,只需要类似下面写即可
    01 classIteblogRDD(prev:RDD[T],XXXX:C)
    02     extends RDD[T](prev){
    03  
    04 //继承compute方法
    05 override def compute(split: Partition, context: TaskContext): Iterator[T] =  {
    06   ................................
    07 }
    08  
    09 //继承getPartitions方法
    10 override protected def getPartitions: Array[Partition] =
    11       ................................
    12 }

      二、自定义discount函数

      我们自定义discount函数,该函数可以创建一个IteblogDiscountRDD:

    1 def discount(discountPercentage:Double) = new IteblogDiscountRDD(rdd,discountPercentage)

      三、使用IteblogDiscountRDD

      使用IteblogDiscountRDD也是非常简单的,我们可以像使用内置的RDD一样来使用:

    1 import IteblogCustomFunctions._
    2  
    3 val discountRDD = salesRecordRDD.discount(0.1)
    4 println(discountRDD.collect().toList)

      自此,我们已经学会了如何在现有的RDD中定义方法和自定义自己的RDD。

  • 相关阅读:
    Google maps not working IE11
    谷歌地图插件
    杨辉三角python的最佳实现方式,牛的不能再牛了
    今日头条推荐算法原理全文详解
    项目启动大会要点
    学金融应该看的书籍
    【从0到1】经典语录
    appium学习链接记录
    Axure-计算输入字数
    如何提升个人专业能力
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/4962477.html
Copyright © 2011-2022 走看看