zoukankan      html  css  js  c++  java
  • PyCharm 开发pyspark 应用程序

    创建新的空项目:
    在这里插入图片描述
    测试一下环境是否ok
    在这里插入图片描述

    同时,也是为了配置一下spark环境
    在这里插入图片描述
    在这里插入图片描述
    添加如下两个环境变量:
    在这里插入图片描述
    接下来:
    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述
    达到这样,就ok
    在这里插入图片描述

    IDE开发环境就配置ok了,开始Coding…

    from pyspark import SparkConf,SparkContext
    
    
    if __name__ == '__main__':
    
        def my_map():
            conf = SparkConf().setMaster("local[2]").setAppName("spark-demo0401")
            sc = SparkContext(conf=conf)
    
            data = [1, 2, 3, 4, 5]
            inputRDD = sc.parallelize(data)
    
            mapRDD = inputRDD.map(lambda x:x*2)
    
            print(mapRDD.collect())
            sc.stop()
    
    
        def my_filter():
            conf = SparkConf()
            sc = SparkContext(conf=conf)
    
            data = [1,2,3,4,5]
            inputRDD = sc.parallelize(data)
            output = inputRDD.map(lambda x:x*2).filter(lambda x:x>4)
            print(output.collect())
    
    
        def my_flatMap():
            conf= SparkConf()
            sc = SparkContext(conf=conf)
    
            data = ["hello,spark","hello,world","hello,pyspark"]
            inputRDD = sc.parallelize(data)
            output = inputRDD.flatMap(lambda x:x.split(","))
            print(output.collect())
    
        def my_groupByKey():
            conf = SparkConf()
            sc = SparkContext(conf=conf)
            data = ["hello,spark", "hello,world", "hello,pyspark"]
            inputRDD = sc.parallelize(data)
                .flatMap(lambda x:x.split(","))
                .map(lambda x:(x,1))
            output = inputRDD.groupByKey().collect()
            print(output)
    
    
        def my_reduceByKey():
            conf = SparkConf()
            sc = SparkContext(conf=conf)
            data = ["hello,spark", "hello,world", "hello,spark"]
            inputRDD = sc.parallelize(data) 
                .flatMap(lambda x: x.split(",")) 
                .map(lambda x: (x, 1))
                .reduceByKey(lambda x,y:x+y)
            output = inputRDD.collect()
            print(output)
    
        def my_sortByKey():
            conf = SparkConf()
            sc= SparkContext(conf=conf)
            data = ["hello,spark", "hello,world", "hello,spark"]
            inputRDD = sc.parallelize(data).flatMap(lambda x:x.split(","))
                .map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
                .map(lambda x:(x[1],x[0])).sortByKey(ascending=False).map(lambda x:(x[1],x[0]))
    
            print(inputRDD.collect())
    
        my_sortByKey()
    
    
    
    
    
  • 相关阅读:
    kafka 项目实战
    7.DHCP的相关命令
    3.centos 7执行service iptables save报错问题
    39.NFS(网络文件系统)
    37.Samba 文件共享服务1--配置共享资源
    36.Samba 文件共享服务1--安装及配置参数解释
    35.简单文件传输协议
    34.vsftpd服务程序--虚拟用户模式
    33.vsftpd服务程序--本地用户模式
    32.vsftpd服务程序--匿名开放模式
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614690.html
Copyright © 2011-2022 走看看