zoukankan      html  css  js  c++  java
  • Spark编程基础(Python版)

    Spark入门教程(Python版)

    教材官网

    http://dblab.xmu.edu.cn/post/spark-python/ 

    电子教材:

    http://dblab.xmu.edu.cn/blog/1709-2/  

    授课视频

    https://study.163.com/course/introduction/1209408816.htm

    软件下载

    链接: https://pan.baidu.com/s/1dzf4RdWBmdnIiOGwjpOuow 提取码: r5b2  

    Python入门教程 

    http://dblab.xmu.edu.cn/blog/python/

     

    一、spark的安装与使用

    http://dblab.xmu.edu.cn/blog/1307-2/ 

     

    开始安装之前确保
    java –version start-dfs.sh jps 下载spark-2.4.0-bin-without-hadoop.tgz 放到 homeHadoop下载 (~ 下载 )

      

     操作命令:

    1.下载解压权限

    cd 下载
    ls
    sudo tar -zxf spark-2.4.0-bin-without-hadoop.tgz -C /usr/local

    cd /usr/local

    ls

    sudo mv spark-2.4.0-bin-without-hadoop/ ./spark

    ls -l 

    sudo chown -R hadoop ./spark 

    2.配置文件

    cd spark/ 

    /usr/local/spark$ cp ./conf/spark-env.sh.template ./conf/spark-env.sh 

    gedit ./conf/spark-env.sh

    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
    

    3.配置环境变量及生效

    gedit ~/.bashrc 

    export SPARK_HOME=/usr/local/spark
    export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip:PYTHONPATH
    export PYSPARK_PYTHON=python3
    export PATH=$PATH:$SPARK_HOME/bin
    

     之前的 

    export FLUME_HOME=/usr/local/flume                   
    export FLUME_CONF_DIR=$FLUME_HOME/conf
    
    export JAVA_HOME=/usr/lib/jvm/default-java
    export HADOOP_HOME=/usr/local/hadoop
    export HABSE_HOME=/usr/local/hbase
    export HIVE_HOME=/usr/local/hive
    export PATH=$PATH:$HIVE_HOME/bin
    export PATH=$PATH:/usr/local/hbase/bin
    export PATH=$PATH:$FLUME_HOME/bin:$HADOOP_HOME:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
    
    export STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar
    
    export SQOOP_HOME=/usr/local/sqoop
    export PATH=$PATH:$SBT_HOME/bin:$SQOOP_HOME/bin
    export CLASSPATH=$CLASSPATH:$SQOOP_HOME/lib
    

    source ~/.bashrc

    $SPARK_HOME/

    4.运行测试

     ./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"

    Pi is roughly 3.1359356796783984 

    5.交互式命令行

    pyspark

    >>>5+9*2
    23
    >>> '201806120001'+'xiaoming'
    '201806120001xiaoming'
    >>> a='xiaoming'
    >>> a
    'xiaoming'
    >>> b='{} 2018001260 {}'.format(a,a) 
    >>> b
    'xiaoming 2018001260 xiaoming'
    >>> b.split()
    ['xiaoming', '2018001260', 'xiaoming']
    >>> exit()
    

     

    SparkContext

    >>> sc
    <pyspark.context.SparkContext object at 0x7f2bce403dd8>

     

    一句代码实现WordCount  

    >>> sc.textFile("file:///home/hadoop/my.txt").flatMap(lambda line: line.split(" ")).map(lambda word : (word,1)).reduceByKey(lambda x,y : x+y).saveAsTextFile("file:///home/hadoop/myout")
    
    

    sc.textFile(in_url).flatMap(lambda line: line.split(" ")).map(lambda word : (word.lower(),1)).reduceByKey(lambda a,b : a+b) 

    编程示例

    from pyspark import SparkConf, SparkContext
    conf = SparkConf().setMaster("local").setAppName("My App")
    sc = SparkContext(conf = conf)
    logFile = "file:///usr/local/spark/README.md"
    logData = sc.textFile(logFile, 2).cache()
    numAs = logData.filter(lambda line: 'a' in line).count()
    numBs = logData.filter(lambda line: 'b' in line).count()
    print('Lines with a: %s, Lines with b: %s' % (numAs, numBs))
    $ cd /usr/local/spark/mycode/python
    $ python3 WordCount.py

     6.python基本语法

     http://dblab.xmu.edu.cn/blog/python/

    https://www.runoob.com/python3/python3-tutorial.html

     

    path='/home/hadoop/wc/f1.txt'
    with open(path) as f:
        text=f.read()
    words = text.split()
    wc={}
    for word in words:
        wc[word]=wc.get(word,0)+1
    wclist=list(wc.items())
    wclist.sort(key=lambda x:x[1],reverse=True)
    print(wclist)

    7.预备实验

    • Linux系统的安装

    http://dblab.xmu.edu.cn/blog/285/

    • 在Windows中使用VirtualBox安装Ubuntu

    http://dblab.xmu.edu.cn/blog/337-2/

    • Linux系统的常用命令

    http://dblab.xmu.edu.cn/blog/1624-2/

    • 在Windows系统中利用FTP软件向Ubuntu系统上传文件

    http://dblab.xmu.edu.cn/blog/1608-2/

    • Linux系统中下载安装文件和解压缩方法

    http://dblab.xmu.edu.cn/blog/1606-2/

    • Linux系统中vim编辑器的安装和使用方法

    http://dblab.xmu.edu.cn/blog/1607-2/

    • Hadoop的安装和使用

    http://dblab.xmu.edu.cn/blog/install-hadoop/

     

    8.使用Pycharm开发Spark应用程序

    http://dblab.xmu.edu.cn/blog/2295/

     二、Spark RDD编程 

    本地文件加载数据

    pyspark
    
    >>> url="file:///home/hadoop/hive_hql25.txt"
    >>> lines=sc.textFile(url)
    >>> lines
    file:///home/hadoop/hive_hql25.txt MapPartitionsRDD[19] at textFile at NativeMethodAccessorImpl.java:0
    
    
    >>> lines.count()
    8
    >>> lines.first()
    'select count(*) from bigdata_user;'
    >>> lines.foreach(print)
    

      

     HDFS文件加载数据


    hadoop@dblab-VirtualBox:~$ $JAVA_HOME
    bash: /usr/lib/jvm/default-java: 是一个目录
    hadoop@dblab-VirtualBox:~$ java -version
    openjdk version "1.8.0_275"


    start-dfs.sh jps hdfs dfs -ls input
    >>> url = '/user/hadoop/input/1342-0.txt' >>> lines=sc.textFile(url) >>> lines.first() '' >>> lines.count() 14594 >>>
    stop-dfs.sh

     RDD操作 filter

    #本地文件数据RDD

    >>> 'sel' in 'select count(*)' True >>> a=[1,2,3] >>> lambda i:a[i]*2 <function <lambda> at 0x7f438e962620> >>> b=lambda i:a[i]*2 >>> b <function <lambda> at 0x7f438c09e950> >>> b(2) 6 >>> lineSelect=lines.filter(lambda line:'select' in line) >>> lineSelect.count() 4

     

    RDD操作map

    #
    >>> data=[19,20,21] >>> rdd1=sc.parallelize(data) >>> rdd1 ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:475 >>> rdd1.count() 3 >>> rdd1.foreach(print) 19 20 21 >>> rdd2=rdd1.map(lambda i:x+2000) >>> rdd2 PythonRDD[4] at RDD at PythonRDD.scala:48 >>> rdd2.foreach(print)
    2019
    2020
    2021


    RDD操作 reduceByKey

    >>> a=['a','b','a']
    >>> b=[1,1,1]
    >>> c=zip(a,b)
    >>> c
    <zip object at 0x7fdbdd7034c8>
    >>> d=sc.parallelize(c)
    >>> d.foreach(print)
    ('a', 1)
    ('b', 1)
    ('a', 1)
    >>> e=d.reduceByKey(lambda a,b:a+b)
    >>> e.foreach(print)
    ('a', 2)
    ('b', 1)
    

     词频统计

    >>> in_url = 'file:///home/hadoop/my.txt'
    in_url='hdfs://localhost:9000/user/hadoop/my.txt'
    >>> lines=sc.textFile(in_url)
    >>> lines
    file:///home/hadoop/my.txt MapPartitionsRDD[50] at textFile at NativeMethodAccessorImpl.java:0
    >>> lines.first()
    'export SPARK_HOME=/usr/local/spark'
    >>> lines.count()
    >>> lines.collect()
    >>> words=lines.flatMap(lambda line:line.split()) 
    >>> words.collect()
    ['export', 'SPARK_HOME=/usr/local/spark', 'export', 'PYTHONPATH=$SPARK_HOME/python', 'export', 'PYSPARK_PYTHON=python3', 'export', 'PATH=$PATH:$SPARK_HOME/bin']
    >>> words=words.flatMap(lambda line:line.split('=')) >>> words.collect() ['export', 'SPARK_HOME', '/usr/local/spark', 'export', 'PYTHONPATH', '$SPARK_HOME/python', 'export', 'PYSPARK_PYTHON', 'python3', 'export', 'PATH', '$PATH:$SPARK_HOME/bin']
    >>> words=words.flatMap(lambda line:line.split('/')) >>> words.collect() ['export', 'SPARK_HOME', '', 'usr', 'local', 'spark', 'export', 'PYTHONPATH', '$SPARK_HOME', 'python', 'export', 'PYSPARK_PYTHON', 'python3', 'export', 'PATH', '$PATH:$SPARK_HOME', 'bin']
    >>> word=words.map(lambda word:(word,1)) >>> word.collect() [('export', 1), ('SPARK_HOME', 1), ('', 1), ('usr', 1), ('local', 1), ('spark', 1), ('export', 1), ('PYTHONPATH', 1), ('$SPARK_HOME', 1), ('python', 1), ('export', 1), ('PYSPARK_PYTHON', 1), ('python3', 1), ('export', 1), ('PATH', 1), ('$PATH:$SPARK_HOME', 1), ('bin', 1)]

    >>> wc=word.reduceByKey(lambda a,b:a+b) >>> wc.collect() [('', 1), ('python', 1), ('usr', 1), ('python3', 1), ('PATH', 1), ('PYTHONPATH', 1), ('bin', 1), ('export', 4), ('local', 1), ('spark', 1), ('SPARK_HOME', 1), ('$PATH:$SPARK_HOME', 1), ('PYSPARK_PYTHON', 1), ('$SPARK_HOME', 1)]

    >>> out_url='file:///home/hadoop/myout/0316'
    >>> wc.saveAsTextFile(out_url)

    >>>

    >>> out_url='myout'
    >>> wcsort.saveAsTextFile(out_url)
    >>> exit()

    hdfs dfs -ls

    hdfs dfs -ls myout

    hdfs dfs -cat myout/part-00000 | head -5

    >>> wc=sc.textFile(in_url).flatMap(lambda line: line.split(" ")).map(lambda word : (word.lower(),1)).reduceByKey(lambda a,b : a+b)

    >>> wc.count()

    >>> wc.collect()
    >>> wc.cache()


    >>> wcsort=wc.sortByKey()
    >>> wcsort.collect()

    >>> wcsort=wc.sortByKey()
    >>> wcsort.collect()

    >>> wcsort=wc.sortBy(lambda x:x[1],False)

    >>> wcsort.take(5)


    学生课程分数案例

    count

    url='file:///home/hadoop/chapter4-data01.txt'
    lines=sc.textFile(url)
    lines.take(3)
    
    name=lines.map(lambda line:line.split(',')).map(lambda line:(line[0],(line[1],line[2])))
    name.take(3)
    
    name.countByKey() #kvRDD
    
    name.countByValue()
    
    lines.map(lambda line:line.split(',')).take(5)
    lines.map(lambda line:line.split(',')).map(lambda line:(line[0])).take(5)
    lines.map(lambda line:line.split(',')).map(lambda line:(line[0])).countByValue()
    
    lines.map(lambda line:line.split(',')).map(lambda line:(line[0])).take(5)
    lines.map(lambda line:line.split(',')).map(lambda line:(line[0])).distinct().take(5)
    lines.map(lambda line:line.split(',')).map(lambda line:(line[0])).distinct().count()
    

      

    groupBy

    groupByName=lines.map(lambda line:line.split(',')).map(lambda line:(line[0],(line[1],line[2]))).groupByKey()
    groupByName
    
    groupByName.collect()[10]
    for i in groupByName.collect()[10][1]:
        print(i)


    reduceByKey
    course=lines.map(lambda line:line.split(',')).map(lambda line:(line[1],1))
    course.take(3)
    
    course.reduceByKey(lambda a,b:a+b).collect()
    

    Tom

    tomRDD=lines.filter(lambda line: 'Tom' in line).map(lambda line: line.split(','))
    tomRDD.collect()

    tomRDD.sortBy(lambda x:x[2],False).collect()

    from numpy import mean
    tomList=lines.map(lambda line: line.split(',')).map(lambda line:(line[0],line[2])).lookup('Tom')
    mean([int(x) for x in tomList])

     

    combineByKey 课程,人数,平均分

    course=lines.map(lambda line:line.split(',')).map(lambda line:(line[1],line[2]))
    course.first()

    courseC=course.combineByKey(lambda v: (int(v),1), lambda c,v:(c[0]+int(v),c[1]+1), lambda c1,c2:(c1[0]+c2[0],c1[1]+c2[1]))
    courseC.first()

    courseC.map(lambda x: (x[0], x[1][1], x[1][0]/x[1][1])).collect() 

     

    可视化

    #(单词,词频)的列表
    from
    pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("SparkReadme") sc = SparkContext(conf = conf) url='input/1342-0.txt‘ with open('/home/hadoop/stopwords.txt') as f: stops=f.read().split() wc = sc.textFile(url).flatMap(lambda line: line.lower().replace(',','').split()).filter(lambda word: word not in stops).filter(lambda word:len(word)>2).map(lambda word : (word,1)).reduceByKey(lambda a,b : a+b).sortBy(lambda x:x[1],False).take(100)
    # 词云:
    from pyecharts.charts import WordCloud mywordcloud = WordCloud() mywordcloud.add('',wc, shape='circle') mywordcloud.render()

    条形图

    from pyecharts.charts import Bar
    bar = Bar()
    bar.add_xaxis(cs.keys().collect())
    bar.add_yaxis('avg',cs.map(lambda x:x[2]).collect())
    bar.render()
    
    

    条形图配置  

    from pyecharts.charts import Bar
    from pyecharts import options as opts
    from pyecharts.globals import ThemeType
    
    bar = Bar(init_opts=opts.InitOpts(theme=ThemeType.PURPLE_PASSION))
    bar.add_xaxis(cs.keys().collect())
    bar.add_yaxis('rs',cs.map(lambda x:x[1]).collect())
    bar.add_yaxis('avg',cs.map(lambda x:x[2]).collect())
    
    bar.set_global_opts(title_opts=opts.TitleOpts(title="课程", subtitle="选修人数,平均分"),
                        xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-15)),
    yaxis_opts=opts.AxisOpts(max_=150))
    bar.render("cs_rs_avg.html")

    选项

    https://pyecharts.org/#/zh-cn/global_options

    主题

    https://pyecharts.org/#/zh-cn/themes

    示例

    https://gallery.pyecharts.org/#/Bar/README

    三、Spark SQL

    四、综合实践

  • 相关阅读:
    蒲公英 &#183; JELLY技术周刊 Vol.29: 前端智能化在阿里的那些事
    蒲公英 · JELLY技术周刊 Vol.28: Next.js 10 发布
    蒲公英 &#183; JELLY技术周刊 Vol 27: 平平无奇 React 17
    蒲公英 &#183; JELLY技术周刊 Vol.26: 请问您这个月要来点肝么?
    《痞子衡嵌入式半月刊》 索引
    痞子衡嵌入式:恩智浦MCU集成开发环境与开发工具教程
    《痞子衡嵌入式半月刊》 第 21 期
    痞子衡嵌入式:了解i.MXRT1060系列ROM中串行NOR Flash启动初始化流程优化点
    痞子衡嵌入式:深入i.MXRT1050系列ROM中串行NOR Flash启动初始化流程
    痞子衡嵌入式:一个奇怪的Keil MDK下变量链接强制对齐报错问题(--legacyalign)
  • 原文地址:https://www.cnblogs.com/MissDu/p/14435081.html
Copyright © 2011-2022 走看看