Spark的核心是RDD(弹性分布式数据集),是由AMPLab实验室提出的概念,属于一种分布式的内存系统数据集应用。Spark的主要优势来自RDD本身的特性,RDD能与其他系统兼容,可以导入外部存储系统数据,例如HDFS、HBase或者其他Hadoop数据源。
RDD的3种基本运算:
1)“转换“运算 Transformation:RDD执行”转换“运算的结果,会产生另外一个RDD,RDD具有lazy特性,所以”转换“运算并不会立刻实际执行,等到执行”动作“运算才会实际执行。
2)“动作”运算:RDD执行“动作”运算后不会产生另外一个RDD,而是会产生数值、数组或写入文件系统。RDD执行“动作”运算时立刻实际执行,并且连同之前“转换”运算一起执行。
3)“持久化”:对于那些会重复使用的RDD,可以将RDD“持久化”在内存中作为后续使用,以提高执行性能。
举例子:
一、基本RDD“转换”运算
1)创建intRDD
intRDD=sc.parallelize([3,1,2,5,5])
intRDD.collect()
2)创建stringRDD
stringRDD=sc.parallelize(["Apple","Orange","Banana","Grape","Apple"])
stringRDD.collect()
3)map运算介绍(具名函数或匿名函数)
以“加一”为例
具名函数
def addOne(x):
return (x+1)
intRDD.map(addOne).collect()
匿名函数
intRDD.map(lambda x : x+1).collect()
(针对stringRDD)stringRDD.map(lambda x : "fruit:"+x).collect()
4)filter数字运算
intRDD.filter(lambda x : 1 < x and x < 5).collect()
(stringRDD.filter(lambda x : "ra" in x).collect()
5)distinct运算(删除重复的元素)
intRDD.distinct().collect()
6)randomSplit运算(可以将整个集合元素以随机数的方式按照比例分为多个RDD)
sRDD=intRDD.randomSplit([0.4,0.6])
sRDD[0].collect()
sRDD[1].collect()
7)groupBy运算(groupBy可以按照传入的匿名函数规则将数据分为多个List)
gRDD=intRDD.groupBy(
lambda x : "even" if (x % 2 == 0)
else "odd").collect()
8)读个RDD”转换“运算
8-1)union并集运算
intRDD1.union(intRDD2).union(intRDD3).collect()
8-2)intersection交集运算
intRDD1.intersection(intRDD2).count()
8-3)subtract差集运算
intRDD1.subtract(intRDD2).collect()
8-4)cartesian笛卡尔乘积运算
intRDD1.cartesian(intRDD2).collect()
9)RDD Key-Value基本“转换”运算
9-1)使用filter筛选key运算
kvRDD1.filter(lambda keyValue : keyValue[0] < 5).collect()
9-2)使用filter筛选value运算
kvRDD1.filter(lambda keyValue : keyValue[1] < 5).collect()
9-3)mapValues运算
将Value的每一个值进行平方运算
kvRDD1.mapValues(lambda x : x*x).collect()
9-4)sortByKey从小到大按照key排序
kvRDD1.sortByKey(ascending=True).collect()(如果是False则是从大到小)
9-5)reduceByKey
KvRDD1.reduceByKey(lambda x,y : x+y).collect()
10)多个RDD-Key-Value“转换”运算
10-1)Key-Value RDD join运算
kvRDD1.join(kvRDD2).collect()
10-2)Key-Value leftOuterJoin运算
kvRDD1.leftOuterJoin(kvRDD2).collect()
10-3)Key-Value RDD rightOuterJoin运算
kvRDD1.rightOuterJoin(kvRDD2).collect()
10-4)Key-Value subtractByKey运算(删除相同key值的数据)
kvRDD1.subtractByKey(kvRDD2).collect()
二、基本”动作“运算
1)基础操作
intRDD.first() 取出第1项数据
intRDD.take(2) 取出前2项数据
intRDD.takeOrdered(3) 从小到大取出前3项
intRDD.takeOrdered(3,key=lambda x : -x) 从大到小排序,取出前3项
intRDD.stats() 显示统计的特性
2)Key-Value“动作”运算
kvRDD1.first()
kvRDD1.take(2)
kvFirst=kvRDD1.first()
kvFirst[0](key)
kvFirst[1](value)
kvRDD1.countByKey()
创建Key-Value的字典
KV=kvRDD1.collectAsMap()
KV[3](输入的是key)
Key-Value lookup运算,输入key值来查找value值:
kvRDD1.lookup(3)
三、RDD Persistence持久化
RDD.persist(存储等级)(可选:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY、MEMORY_ONLY_2,MEMORY_AND_DISK_2,etc)
RDD.unpersist()