zoukankan      html  css  js  c++  java
  • 2 pyspark学习----基本操作

    1 spark的python环境部署可以参照上面一篇哟。http://www.cnblogs.com/lanjianhappy/p/8705974.html

    2 pyspark的基本操作。

     1 # coding:utf-8
     2 from pyspark import SparkContext, SparkConf
     3 
     4 sc = SparkContext()#init contet
     5 intRDD = sc.parallelize([3,1,2,5,5])#create RDD
     6 stringRDD = sc.parallelize(['apple','orange','yellow'])
     7 print intRDD.collect()#transfrom to python
     8 print stringRDD.collect()
     9 #每个元素+1
    10 print intRDD.map(lambda x:x+1).collect()#4,2,3,6,6
    11 #输出小于3
    12 print intRDD.filter(lambda x:x<3).collect()#print number<3 in RDD
    13 print stringRDD.filter(lambda x:'ra' in x).collect()#print contain 'ra'
    14 
    15 print intRDD.distinct().collect()
    16 #奇数偶数分开
    17 result = intRDD.groupBy(lambda x:x%2).collect()
    18 print sorted([(x,sorted(y)) for(x,y) in result])
    19 
    20 #多个RDD并集
    21 intRDD1 = sc.parallelize([3,1,2,3,5])
    22 intRDD2 = sc.parallelize([8,2,1,9,5])
    23 intRDD3 = sc.parallelize([7,1,3,4,7])
    24 print intRDD1.union(intRDD2).union(intRDD3).collect()
    25 
    26 #交集
    27 print intRDD1.intersection(intRDD2)
    28 
    29 #差集
    30 print intRDD1.subtract(intRDD2)
    31 
    32 #笛卡尔集
    33 print intRDD1.cartesian(intRDD2).collect()#返回10个元素
    34 
    35 #读取元素
    36 #取第一条数据
    37 print intRDD.first()
    38 #取前两条数据
    39 print intRDD.take(2)
    40 #升序排列,并取前3条数据
    41 print intRDD.takeOrdered(3)
    42 #降序排列,并取前3条数据
    43 print intRDD.takeOrdered(3,lambda x:-x)
    44 
    45 #统计功能 min max stdev count sum mean
    46 print intRDD.stats()
    47 
    48 #转换操作
    49 kvRDDW1 = sc.parallelize([(1,2),(3,4),(5,6),(7,8)])
    50 #分别得到keys values
    51 print kvRDDW1.keys().collect()#1 3 5 7
    52 print kvRDDW1.values().collect()#2 4 6 8
    53 
    54 #筛选元素 筛选小于5的数据 x[0]按照值 x[1]按照键
    55 print kvRDDW1.filter(lambda x:x[0]<5).collect()
    56 print kvRDDW1.filter(lambda x:x[1]<5).collect()
    57 #值运算 mapvalues处理value
    58 print kvRDDW1.mapValues(lambda  x:x**2).collect()
    59 
    60 #按照key排序
    61 print kvRDDW1.sortByKey().collect()
    62 print kvRDDW1.sortByKey(True).collect()
    63 print kvRDDW1.sortByKey(False).collect()#倒序
    64 
    65 #对具有相同key的进行合并
    66 print kvRDDW1.reduceByKey(lambda x,y:x+y).collect()
    67 
    68 #多个RDD相同的key进行内连接
    69 kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
    70 kvRDD2 = sc.parallelize([(3,8)])
    71 print kvRDD1.join(kvRDD2).collect()#[(3, (4, 8)), (3, (6, 8))]
    72 
    73 #key值统计
    74 print kvRDD1.countByKey().collect()
    75 #lookup 根据key查找对应的value
    76 print kvRDD1.lookup(3)

    加油!

  • 相关阅读:
    C# 设计模式
    FutureTask、Fork/Join、 BlockingQueue
    线程的几种创建方式
    行锁、表锁、乐观锁、悲观锁
    J.U.C之AQS
    同步容器并发容器
    线程不安全类
    线程封闭
    不可变对象
    安全发布对象—单例模式
  • 原文地址:https://www.cnblogs.com/lanjianhappy/p/8706035.html
Copyright © 2011-2022 走看看