zoukankan      html  css  js  c++  java
  • 【PySpark】学习记录1

    一. Spark介绍

    Spark是一个分布式计算平台。运算速度远超于HDFS,并且能与python、java更好地交互。
    我的疑问:在数据处理/模型训练的过程中,Spark这个平台是需要我手动写一些代码,例如读取数据啥的,还是我只要在带有pyspark的kernal的平台上运行就可以?kernal是什么??为什么我在NAIE平台上选了pyspark的kernal,接下来就会报错呢?处理方式(数据读写这些)又不一样吗?

    二. 今天的代码

    导入的所需要的包

    import os
    from operator import add
    from pyspark import SparkContext
    

    查看当前文件所在路径。被路径整怕了……

    os.getcwd()
    

    输出:

    '/home/ma-user/work'
    

    查看这个路径下有什么文件:

    os.listdir('/home/ma-user/work')
    

    输出:

    ['naie_platform', '__train.json', 'preprocess.ipynb',  'requirements.txt']
    

    可以看到,我自己建了一个testSpark.txt并没有显示在这里。。

    1. 想要实现的功能:统计txt文件里单词数目

    • 读取文件并分割字符串:
    if len(sys.argv) < 2:
            print("Usage:wordcount <filepath>") # ???
            exit(-1)
        # initialize sparkcontext
    #     sc = SparkContext(appName="Python_Word_Count") # 实际上不需要这个,会报错,因为默认已经有了一个?或者只需要运行1次,最好是与sc.stop()一起用避免错误
        
    # 将文本数据读为一个存放字符串的RDD
    lines = sc.textFile('/home/ma-user/work/preprocess/requirements.txt') # sys.argv[1]是个json文件,但我不懂它是什么,也不知道会不会引起报错
    # lines = sc.textFile('sys.argv[0]') # 这个函数大概不能读取.py 或者.json文件吧 反正会报错
    # 把字符串切分成单词
    words = lines.flatMap(lambda x:x.split(' '))
    words.collect()
    

    输出:

    ['#name',
     '[condition]',
     '[version]',
     '#condition',
     '',
     '',
     '',
     '==,',
     '>=,',
     '<=,',
     '>,',
     '<',
     '#tensorflow==1.8.1',
     'naie']
    
    • 每个单词映射为(x,1)的样子方便统计数目,利用map功能:
    mapWords = words.map(lambda x:(x,1)) # PythonRDD[11] at RDD at PythonRDD.scala:52
    mapWords.collect()
    

    输出:

    [('#name', 1),
     ('[condition]', 1),
     ('[version]', 1),
     ('#condition', 1),
     ('', 1),
     ('', 1),
     ('', 1),
     ('==,', 1),
     ('>=,', 1),
     ('<=,', 1),
     ('>,', 1),
     ('<', 1),
     ('#tensorflow==1.8.1', 1),
     ('naie', 1)]
    
    • 合并相同键值,实现统计单词数目。
      如果没有collect()这个函数,每个函数返回的都是一个PythonRDD,看不出RDD里的值的。
    combine_same_keys = mapWords.reduceByKey(add) # PythonRDD[17] at RDD at PythonRDD.scala:52 
    combine_same_keys.collect()
    

    输出:

    [('[version]', 1),
     ('#condition', 1),
     ('', 3),
     ('==,', 1),
     ('>=,', 1),
     ('naie', 1),
     ('#name', 1),
     ('[condition]', 1),
     ('<=,', 1),
     ('>,', 1),
     ('<', 1),
     ('#tensorflow==1.8.1', 1)]
    
    • 打印统计结果:
    for (keys, counts) in combine_same_keys.collect():
        print(keys, counts)
    

    输出:

    [version] 1
    #condition 1
     3
    ==, 1
    >=, 1
    naie 1
    #name 1
    [condition] 1
    <=, 1
    >, 1
    < 1
    #tensorflow==1.8.1 1
    
    • 关闭RDD
      一开始只要打开一次sc,然后关闭了之后下一次运行就需要再初始化一次textFile
    sc.stop() # 关闭spark, 关闭后就会提示:AttributeError: 'NoneType' object has no attribute 'sc'
    
    • 想试一试其他功能:
      word_add1 = lines.flatMap(lambda x:x.split(' ')) # 对数据格式也有要求,并不会帮你把单词转为什么东西然后+1,你看这个.map(lambda x:x+1)就不行,提示TypeError:TypeError: must be str, not int
      word_add1.collect()

    • 过滤掉重复的:

    filter_same = word_add1.distinct()
    filter_same.collect()
    

    输出:

    ['[version]',
     '#condition',
     '',
     '==,',
     '>=,',
     'naie',
     '#name',
     '[condition]',
     '<=,',
     '>,',
     '<',
     '#tensorflow==1.8.1']
    
    • 筛选,filter是保留符合条件的,也就是将不等于'==,'和''的字符留下:
    # filter_same.filter(lambda x:x!=('' and'==,'and'>=,'and'<'and'>,')).collect() # 这样一个都删不掉.. and/or都一样
    filter_same.filter(lambda x:x!= '==,' and x!='' ).collect() # 这样可以删掉两个
    
    ['[version]',
     '#condition',
     '>=,',
     'naie',
     '#name',
     '[condition]',
     '<=,',
     '>,',
     '<',
     '#tensorflow==1.8.1']
    

    2. 总结:整体流程是先创建一个RDD,然后对它进行操作

    例:对一个数据为{1,2,3,3}的RDD进行基本RDD转化操作

    行动操作:

    三. 报错

    遇见的报错:

    # ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Python_Word_Count, master=local[*]) created by __init__ at <ipython-input-46-e6dabb8e53ad>:1 
    

    出错语句:

    sc = SparkContext(appName="Python_Word_Count")
    

    原因是这个只要打开一次,在没有关闭之前,再次输入这个语句都会提示不能同时运行多个SparkContexts。

    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/ma-user/work/readData/testSpark.text
    

    找不到该文件,查看该路径下是否有这个文件(不要用眼睛看。。)

    ![](https://img2020.cnblogs.com/blog/2037199/202008/2037199-20200813220715363-996762160.png)
    

    解决方法:给file加上单引号,变成'file'

  • 相关阅读:
    查询记录时rs.previous()的使用
    Cocos2d-x中由sprite来驱动Box2D的body运动(用来制作平台游戏中多变的机关)
    vim经常使用命令总结
    微信公众号:码农的世界
    RHEL5 X86-64上安装Oracle 11gR2演示样例与总结
    JavaScript中获取当前项目的绝对路径
    thinkphp内置标签简单讲解
    function $(id) {}表示什么函数
    表单实例(判断两次密码是否一致)
    thinkphp模板继承
  • 原文地址:https://www.cnblogs.com/sweetsmartrange/p/13492316.html
Copyright © 2011-2022 走看看