1 spark的python环境部署可以参照上面一篇哟。http://www.cnblogs.com/lanjianhappy/p/8705974.html
2 pyspark的基本操作。
1 # coding:utf-8 2 from pyspark import SparkContext, SparkConf 3 4 sc = SparkContext()#init contet 5 intRDD = sc.parallelize([3,1,2,5,5])#create RDD 6 stringRDD = sc.parallelize(['apple','orange','yellow']) 7 print intRDD.collect()#transfrom to python 8 print stringRDD.collect() 9 #每个元素+1 10 print intRDD.map(lambda x:x+1).collect()#4,2,3,6,6 11 #输出小于3 12 print intRDD.filter(lambda x:x<3).collect()#print number<3 in RDD 13 print stringRDD.filter(lambda x:'ra' in x).collect()#print contain 'ra' 14 15 print intRDD.distinct().collect() 16 #奇数偶数分开 17 result = intRDD.groupBy(lambda x:x%2).collect() 18 print sorted([(x,sorted(y)) for(x,y) in result]) 19 20 #多个RDD并集 21 intRDD1 = sc.parallelize([3,1,2,3,5]) 22 intRDD2 = sc.parallelize([8,2,1,9,5]) 23 intRDD3 = sc.parallelize([7,1,3,4,7]) 24 print intRDD1.union(intRDD2).union(intRDD3).collect() 25 26 #交集 27 print intRDD1.intersection(intRDD2) 28 29 #差集 30 print intRDD1.subtract(intRDD2) 31 32 #笛卡尔集 33 print intRDD1.cartesian(intRDD2).collect()#返回10个元素 34 35 #读取元素 36 #取第一条数据 37 print intRDD.first() 38 #取前两条数据 39 print intRDD.take(2) 40 #升序排列,并取前3条数据 41 print intRDD.takeOrdered(3) 42 #降序排列,并取前3条数据 43 print intRDD.takeOrdered(3,lambda x:-x) 44 45 #统计功能 min max stdev count sum mean 46 print intRDD.stats() 47 48 #转换操作 49 kvRDDW1 = sc.parallelize([(1,2),(3,4),(5,6),(7,8)]) 50 #分别得到keys values 51 print kvRDDW1.keys().collect()#1 3 5 7 52 print kvRDDW1.values().collect()#2 4 6 8 53 54 #筛选元素 筛选小于5的数据 x[0]按照值 x[1]按照键 55 print kvRDDW1.filter(lambda x:x[0]<5).collect() 56 print kvRDDW1.filter(lambda x:x[1]<5).collect() 57 #值运算 mapvalues处理value 58 print kvRDDW1.mapValues(lambda x:x**2).collect() 59 60 #按照key排序 61 print kvRDDW1.sortByKey().collect() 62 print kvRDDW1.sortByKey(True).collect() 63 print kvRDDW1.sortByKey(False).collect()#倒序 64 65 #对具有相同key的进行合并 66 print kvRDDW1.reduceByKey(lambda x,y:x+y).collect() 67 68 #多个RDD相同的key进行内连接 69 kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) 70 kvRDD2 = sc.parallelize([(3,8)]) 71 print kvRDD1.join(kvRDD2).collect()#[(3, (4, 8)), (3, (6, 8))] 72 73 #key值统计 74 print kvRDD1.countByKey().collect() 75 #lookup 根据key查找对应的value 76 print kvRDD1.lookup(3)
加油!