zoukankan      html  css  js  c++  java
  • PySpark笔记

    spark源码位置:https://github.com/apache/spark

    Spark Core核心RDD及编程

    什么是RDD:
    1.是一个抽象类不能直接使用,在子类中实现抽象方法
    2.带泛型的,可以支持多种类型:例如可以传入string,person,user
    3.弹性分布式数据集,一个不可变的,可并行操作的元素分区集合

    RDD都有五个主要特性:
    1.-分区列表:一个RDD由多个分区(partition)构成
    2.-计算每个分区(partition)的函数
    3.-依赖于其他rdd的列表
    4.-可选的,键值RDDs的分区器(例如,RDD是哈希分区的)
    5.-可选的,计算每个分区的最佳位置列表(例如,位置块是一个HDFS文件)

    面试常考:hashset和hashmap怎么实现的?涉及hashcode方法

    Spark程序必须做的第一件事是创建一个SparkContext对象,
    它告诉Spark如何访问集群。要创建SparkContext前需要先创建一个SparkConf对象。
    Spark功能的主要入口点。SparkContext表示与Spark群集的连接,通过SparkContext来创建RDD和广播变量到该集群上

    RDD常用算子编程详解
    RDD有3类操作:Transformations算子,Action算子,Cache操作(缓存操作)

    Spark中的所有转换都是惰性(lazy)的,不会立即计算结果。只记住应用于基本数据集(例如文件)的转换。只有当这个操作需要将结果返回到驱动程序时才会计算转换。这种设计使Spark能够更有效地运行。例如,我们可以意识到通过map创建的数据集将在reduce中使用,并且只将reduce的结果返回给驱动程序,而不是更大的映射数据集

    指定PySpark使用的python版本:在环境变量中添加python版本,例如:export PYSPARK_PYTHON=python3.6

    Spark运行模式

    在Spark中支持4中运行模式:
    1.local模式:开发时使用,运行基本功能,然后提交到YARN生产上。
    2.standalone模式:是Spark自带的,如果一个集群是standalone模式,需要在多台机器上同时部署Spark环境(不经常用)
    3.YARN模式:建议生产使用YARN模式,统一使用YARN进行整个集群作业(MR/Spark)的资源调度
    4.Mesos模式:国内用的不多

    不管用什么模式,Spark应用代码是一样的,只需提交时通过--master参数来指定运行模式

    启动Spark:在$SPARK_HOME/bin/目录下执行./spark-shell --master local[2] '因为spark需要访问hive驱动'

    local运行模式:

    $SPARK_HOME/bin目录中的Spark -submit脚本用于在集群上启动应用程序。它可以通过统一的接口使用Spark支持的所有集群管理器,因此您不必专门为每个集群管理器配置应用程序

    local模式: 开发时经常使用
    --master
    --name
    --py-files

    执行方式:在$SPARK_HOME/bin/目录下./spark-submit --master local[2] --name spark-local /home/hadoop/script/spark0402.py(代码文件位置)

    file:///home/hadoop/data/hello.txt(输入) file:///home/hadoop/wc/output(输出)

    local模式:./spark-submit --master local[2] --name spark-local /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/wc/output

    yarn模式:./spark-submit --master yarn --name spark-local /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/wc/output

    standalone运行模式:

    1.环境配置需要在$SPARK_HOME/conf目录下将slaves.template,拷贝为slaves后并添加主机名。

    2.要在$SPARK_HOME/conf目录下的spark-env.sh中添加$JAVA_HOME和$PYTHON_HOME安装路径,否则会报错

    3.启动spark集群:$SPARK_HOME/sbin/./start-all.sh

    4.检测启动spark集群是否成功:输入命令jps:有Master和Worker 2个进程,就说明standalone模式安装成功

    5.Web界面检测spark集群是否启动成功:http://hadoop000:8080,另外:spark://hadoop000:7077是spark在提交作业时指定的端口。

    6.举例使用standalone模式,提交作业:$SPARK_HOME/bin目录下./pyspark --master spark://hadoop000:7077

    7.standalone模式spark-submit提交运行:./spark-submit --master spark://hadoop000:7077 --name spark-standalone /home/hadoop/script/spark_test.py file:///home/hadoop/data/hello.txt file:///home/hadoop/wc/output

    注意:1.如果使用standalone模式。2.节点个数大于1。3.使用本地文件进行测试,必须要保证每个节点上都有本地测试文件,因为是分布式运行会找不到测试文件。

    8.针对于第7点可以使用hdfs进行测试:./spark-submit --master spark://hadoop000:7077 --name spark-standalone /home/hadoop/script/spark_test.py hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/wc/output

    9.yarn模式:mapreduce运行在yarn上,spark on yarn(相同于用spark作为客户端,spark需要做的事情是提交作业到yarn上运行),该模式要启动HDFS和YARN。

    yarn模式和standalone模式有什么区别?

    yarn模式只需要一个节点,然后提交作业即可。不需要spark集群(不需要启动master和worker进程),如果是standalone模式,spark集群上的每个节点都需要部署spark,然后启动spark集群(需要master和worker进程)

    注意:需要指定hadoop_conf_dir或者yarn_conf_dir的配置文件路径。在$SPARK_HOME/conf目录下的spark-env.sh添加$HADOOP_HOME/etc/hadoop配置文件路径。

    例如:./spark-submit --master yarn --name spark-yarn /home/hadoop/script/spark_test.py hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/wc/output

    10.yarn支持client和cluster模式,二者的区别?

    主要是driver运行在什么地方,如果运行在client上,提交作业的进程不能停止否则作业就挂死了,如果运行在cluster上,提交完成作业后提交作业端就可以断开,因为driver运行在AM里面。

    yarn默认运行在client上,如果要运行在cluster上

    运行在client上:./pyspark --master yarn-client

    pyspark/spark-shell是交互式运行程序,交互式运行程序启动以后可以输入代码,只能运行在client里面

    如何查看已经运行完的yarn的日志信息?

    $SPARK_HOME/bin目录下执行yarn logs -applicationId <applicationID>日志名称,日志位置在界面hadoop:8088中的applicationID

    spark核心概念:https://spark.apache.org/docs/latest/cluster-overview.html-----》Glossary
    词汇表

    应用:建立在Spark上的用户程序。由集群上的驱动程序和执行程序组成
    应用jar:包含用户的Spark应用程序的jar。在某些情况下,用户希望创建一个“uber jar”,其中包含他们的应用程序及其依赖项。用户的jar不包含Hadoop或Spark库,但是,这些库将在运行时添加
    驱动程序:运行应用程序的main()函数并创建SparkContext的进程
    集群管理器:用于获取集群上的资源的外部服务(例如,独立管理器、Mesos、YARN)
    部署模式:区分驱动程序进程在何处运行。在“集群”模式下,框架在集群内部启动驱动程序。在“客户端”模式下,提交者在集群外部启动驱动程序
    工作节点:可以在集群中运行应用程序代码的任何节点
    执行者:为工作节点上的应用程序启动的进程,它运行任务并将数据保存在它们之间的内存或磁盘存储器中。每个应用程序都有自己的执行器
    任务:将被发送给一个执行的工作单元
    工作:由多个任务组成的并行计算,这些任务在Spark操作时生成(例如保存、收集);在驱动程序日志中看到这个术语
    阶段:每个作业被划分为更小的任务子集,这些任务子集彼此依赖(类似于MapReduce中的map和reduce阶段);在驱动程序日志中看到这个术语

    Spark和Hadoop重要概念区分:

    Hadoop概念
    1.一个MR程序=一个Job
    2.一个Job=1个或N个Task(map/reduce)
    3.1个task对应一个进程
    4.task运行时开启进程,task执行完毕后销毁进程,对应多个task来说开销比较大的(即使你能通过JVM共享)
    spark概念
    1.一个应用=Driver(main方法中创建SparkContext)+Executors
    2.一个应用=0个到多个Job
    3.一个Job=一个Acton
    4.一个Job=1到N个Stage
    5.一个Stage=1到N个task
    6.一个task对应一个线程,多个task可以以并行的方式运行在一个Excutors进程中

    Spark Cache(缓存):
    cache是一个lazy,遇到action才会执行
    cache好处是:如果一个RDD在后续的计算中可能被使用到建议使用cache。缓存是通过BlockManager来完成

    persist()或cache()区别:
    cache底层调用的是persist方法,传入的参数是:StorageLevel.MEMORY_ONLY,所以,cache=persist,cache没有参数

    Spark窄依赖和宽依赖:窄依赖定义:一个父RDD的分区(partition)最多被子RDD的某个分区(partition)使用一次,流水线作业。包含的算子有:filter map flatmap mapPartitions
    宽依赖定义:一个父RDD的分区(partition)会被子RDD的分区(partition)使用多次,有shuffle。包含的算子有:reduceByKey grupByKey combineByKey

    historyserver:
    $SPARK_HOME/conf/spark-default.conf
    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://namenode:端口号

    Flume是一个分布式日志采集传输系统:
    event(事件)是Flume的基本数据单位由消息头和消息体组成。Flume运行的核心是Agent

    Spark SQL,PySpark不能操作DataSet

    Hive:on MapReducce
    SQL翻译成MapReducce提交到Hadoop Cluster运行

    metastores相当于一个表的元数据信息

    spark核心版本出现核心功能,例如:spark1.6出现Dataset,1.3出现DataFrames,Dateset不支持python原因是python是动态语言
    DataFrames:是以列(列名,列类型,列值)的形式构成分布式数据集

    面试题:RDD与DataFrame的区别:

    Spark Streaming:
    是spark streaming是spark API的扩展,streaming是什么?IO操作有输入有输出。输入:数据源头(例如:Kafka、Flume、Kinesis或TCP套接字等多个源获取);输出:数据的流向(例如:文件系统、数据库)。

    问:安装完spark后能否直接使用spark streaming?答:不需要搭建完spark后可直接使用

    常用实时流处理框架

    spark streaming:不是真正的实时流处理框架,而是一个mini bach操作,使用spark栈一站式批处理解决问题。底层以批处理为主,以流处理为辅。不需要搭建集群
    storm:真正的实时流处理 Tuple 用java写的需要搭建集群
    flink:与spark streaming相反
    kafka storm:国内用的很少

    SparkStreaming执行原理:Spark流接收实时输入数据流,并将数据流拆分为很小的数据批次,然后数据批次由Spark引擎处理,生成最终的批次结果流。

    sparkCore的核心抽象是RDD,对应的5大特性,对应源码中的5个方法是什么?
    sparkstreaming的核心抽象是DStream,DStream是连续的数据流

    面试点:sparkcore的存储策略和sparkstreaming的存储策略区别?

    Azkaban(工作流调度器):

    官网地址:https://azkaban.readthedocs.io/en/latest/getStarted.html#getting-started-with-the-solo-server
    Spark SQL/Hadoop用于做离线统计处理
    有一个典型的ETL操作:该操作步骤
    1.数据抽取:可以使用Sqoop把RDBMS关系型数据库中的数据抽取到Hadoop,如果数据是在其他平台以文本的方式存储可以使用Flume进行日志,文本数据的采集到Hadoop
    2.数据处理,可以采用Hive/MapReducce/Spark/...不同的框架实现
    3.统计结果入库:
    a)数据存放到HDFS(Hive/Spark SQL表/文件)上,启动一个server:在Hive里面叫Server2,在Spark里面叫ThriftServer,通过JDBC方式操作统计结果。
    b)使用sqoop框架把结果导出RDBMS中

    简单的任务调度:直接使用linux的crontab来定义,crontab+shell,优点是简单,易用。缺点是维护繁琐,依赖关系强

    Azkaban架构包括3个核心组件:
    关系型数据库(MySQL),AzkabanWeb服务(server),Azkaban执行服务(ExccutorServer)

    Azkaban运行模式:

    有2种模式:solo-server模式,分布式多执行器模式

    Azkaban源码编译:
    1.去github上下载源码包
    2../gradlew build installDist
    3.先下载gradle-4.6-all.zip,然后整合到azkaban源码中,避免在编译过程中去网络下载,将该文件拷贝到/azkaban-3.57.0/gradle/wrapper目录下,然后修改gradle-wrapper.properties配置文件distributionUrl=gradle-4.6-all.zip即可。

    4.编译成功后,去对应的目录下找对应模式的安装包即可

    5.Azkaban solo server环境部署及快速入门:

    启动azkaban-solo-server在bin目录下执行start-solo.sh,通过jps查看有AzkabanSingleServer

    HDFS作业在Azkaban中的使用

    使用hadoop fs -mkdir /azkban创建文件夹

    创建一个 vi hdfs.job

    内容:

    type=command

    command=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop fs -ls /

    打成zip包:zip -r hdfs.zip hdfs.job,然后在azkaban进行提交

    Hive作业在Azkaban中的使用

    启动hive后创建表

    加载数据到hive表:load data local inpath '/home/hadoop/data/emp.txt' overwrite into table emp

    创建一个 vi hive.job

    内容:

    type=command

    command=/home/hadoop/app/hive-1.1.0-cdh5.7.0/bin/hive -f ‘test.sql’

    Spark Local模式搭建 :在bin目录下执行spark-shell --master local[2],后会有scala命令提示符
    Spark Standalone模式环境搭建:1个maste+n个worker在spark-env.sh的配置
    1.SPARK_MASTER_HOST=主机名
    2.SPARK_WORKER_CORES=2,一个从节点或计算节点分2个core给spark使用    ##WorkNode分出几核给spark使用
    3.SPARK_WORKER_MEMORY=2g,一共2个core和2个内存   ##WorkNode分出多少内存给spark使用
    4.SPARK_WORKER_INSTANCES=1,worker当中启动几个实例,启动个实例     ##WorkNode使用几个spark实例,一般一个就行了


    spark SQL
    thriftserver&beeline的使用
    1.启动thriftserver后在主机名+端口号,hadoop000:10000 可以看到多出来SQL好JDBC/ODBC Server,默认端口是10000可以修改
    2.启动beeline,通过beeline -u jdbc:hive2://localhost:10000 -n(是当前机器的用户名)
    3.修改thriftserver端口
    ./start-thriftserver.sh --master local[2] --jar ~/software/mysql-connector-java-5.1.27-bin.jar --hiveconf .server2.thrift.port=14000

    spark-shell

    在hive中的数据表如何通过sparksql进行访问?
    需要启动spark-shell进行访问
    spark-shell master local[2]
    启动后在scala命令提示符输入:spark.sql("show tables").show,没有进行配置不会存在数据,如果要访问hive里面的数据需要2项配置
    1.hive-site.xml的配置,需要将hive目录下conf文件夹下的hive-site.xml复制到spark安装目录下的conf文件夹下,会出错将msql的jar包传到classpath就可以
    2../spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar
    会出现版本检测在hive-site.xml添加验证为false

    spark-sql执行方式:./spark-sql --master local[2] --jars ~/software/mysql-connector-java-5.1.27-bin.jar,语法和sql一样,访问hadoop000:4040和spark-shell一样

    thriftserver和sparksql,spark-shell有什么区别
    1.spark-shell或sparksql都是一个spark application,每次都需要重新申请资源
    2.thriftserver不管启动多少个客户端(beeline/core)只要是连到一个server上永远是一个spark application。同时解决 数据共享数据的问题。

    通过JDBC的方式访问
    1.Driver
    2.getConnection
    3.执行sql语句

    注意事项:在啊使用jdbc开发是一定要先启动thriftserver 否则会报连接异常的错

    DataFrame在SparkCore里面是以RDD方式进行编程的,在SparkSQL里面是以DataFrame或DataSet方式进行编程

    schema是什么?数据表的结构信息

    A Dataset is a distributed collection of data:数据集是数据的分布式集合
    A DataFrame is a Dataset organized into named columns:DataFrame以列的形式构成的分布式数据集,按照列赋予不同的名称,在概念上等价于关系数据库中的表或R/Python中的数据帧


    DataFrame和RDD对比
    RDD如果用java或scala语言开发底层运行在jvm上,如果是python上有python的运行环境
    DataFrame无论是用java/scala/python开发都会转成logic plan

    DataFrame与RDD互操作
    1.使用反射推断模式:

    前提是事先知道的字段,字段类型等信息

    2.以编程方式指定模式:

    如果反射不能满足编程要求(事先不知道列等信息)

    sparkSQL的外部数据源API如何操作hive表里面的数据
    1.读取数据spark.table(tableName)
    2.将数据写回去df.write.saveAsTable(tablName)

  • 相关阅读:
    Zend_Controller架构
    PHP构造函数的执行顺序
    MySQL性能优化的最佳21条经验
    MySQL触发器学习总结
    使用Zend_Auth和Zend_Acl进行登录认证及根据用户角色进行权限控制
    手动释放你的资源(Please release resources manually)
    InfoPath/SharePoint/WebParts项目组章程 无为而为
    解决错误:sql_variant is incompatible with xml (ASP.NET 2.0 / XML数据类型 ) 无为而为
    使用ISA2004发布SharePoint网站到外部网,需要使用链接转换 无为而为
    InfoPath/SharePoint/WebParts项目组 下一步的工作和团队未来的规划给队员的公开信 无为而为
  • 原文地址:https://www.cnblogs.com/fenghuoliancheng/p/10430246.html
Copyright © 2011-2022 走看看