zoukankan      html  css  js  c++  java
  • 【DataMagic】如何在万亿级别规模的数据量上使用Spark

    欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

    本文首发在云+社区,未经许可,不得转载。

    作者:张国鹏 | 腾讯 运营开发工程师

    一、前言

    Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思路。文章内容为介绍Spark在DataMagic平台扮演的角色、如何快速掌握Spark以及DataMagic平台是如何使用好Spark的。

    二、Spark在DataMagic平台中的角色

    图 2-1

    整套架构的主要功能为日志接入、查询(实时和离线)、计算。离线计算平台主要负责计算这一部分,系统的存储用的是COS(公司内部存储),而非HDFS。

    下面将主要介绍Spark on Yarn这一架构,抽取出来即图2-2所示,可以看到Spark on yarn的运行流程。

    图2-2
     

    三、如何快速掌握Spark

    对于理解Spark,我觉得掌握下面4个步骤就可以了。

    1.理解Spark术语

    对于入门,学习Spark可以通过其架构图,快速了解其关键术语,掌握了关键术语,对Spark基本上就有认识了,分别是结构术语Shuffle、Patitions、MapReduce、Driver、Application Master、Container、Resource Manager、Node Manager等。API编程术语关键RDD、DataFrame,结构术语用于了解其运行原理,API术语用于使用过程中编写代码,掌握了这些术语以及背后的知识,你就也知道Spark的运行原理和如何编程了。

    2.掌握关键配置

    Spark在运行的时候,很多运行信息是通过配置文件读取的,一般在spark-defaults.conf,要把Spark使用好,需要掌握一些关键配置,例如跟运行内存相关的,spark.yarn.executor.memoryOverhead、spark.executor.memory,跟超时相关的spark.network.timeout等等,Spark很多信息都可以通过配置进行更改,因此对于配置需要有一定的掌握。但是使用配置时,也要根据不同的场景,这个举个例子,例如spark.speculation配置,这个配置主要目的是推测执行,当worker1执行慢的情况下,Spark会启动一个worker2,跟worker1执行相同的任务,谁先执行完就用谁的结果,从而加快计算速度,这个特性在一般计算任务来说是非常好的,但是如果是执行一个出库到Mysql的任务时,同时有两个一样的worker,则会导致Mysql的数据重复。因此我们在使用配置时,一定要理解清楚,直接google spark conf就会列出很多配置了。

    3.使用好Spark的并行

    我们之所以使用Spark进行计算,原因就是因为它计算快,但是它快的原因很大在于它的并行度,掌握Spark是如何提供并行服务的,从而是我们更好的提高并行度。

    对于提高并行度,对于RDD,需要从几个方面入手,1、配置num-executor。2、配置executor-cores。3、配置spark.default.parallelism。三者之间的关系一般为spark.default.parallelism=num-executors*executor-cores的2~3倍较为合适。对于Spark-sql,则设置spark.sql.shuffle.partitions、num-executor和executor-cores。

    4.学会如何修改Spark代码

    新手而言,特别是需要对Spark进行优化或者修改时,感到很迷茫,其实我们可以首先聚焦于局部,而Spark确实也是模块化的,不需要觉得Spark复杂并且难以理解,我将从修改Spark代码的某一角度来进行分析。

    首先,Spark的目录结构如图3-1所示,可以通过文件夹,快速知道sql、graphx等代码所在位置,而Spark的运行环境主要由jar包支撑,如图3-2所示,这里截取部分jar包,实际上远比这多,所有的jar包都可以通过Spark的源代码进行编译,当需要修改某个功能时,仅需要找到相应jar包的代码,修改之后,编译该jar包,然后进行替换就行了。

    图3-1
    图3-2

    而对于编译源代码这块,其实也非常简单,安装好maven、scala等相关依赖,下载源代码进行编译即可,掌握修改源码技巧对于使用好开源项目十分重要。

    四、DataMagic平台中的Spark

    Spark在DataMagic中使用,也是在边使用边探索的过程,在这过程中,列举了其比较重要的特点。

    1.快速部署

    在计算中,计算任务的数量以及数据的量级每天都会发生变化,因此对于Spark平台,需要有快速部署的特性,在实体机上,有一键部署脚本,只要运行一个脚本,则可以马上上线一个拥有128G内存、48cores的实体机,但是实体机通常需要申请报备才能获得,因此还会有docker来支持计算资源。

    2.巧用配置优化计算

    Spark大多数属性都是通过配置来实现的,因此可以通过配置动态修改Spark的运行行为,这里举个例子,例如通过配置自动调整exector的数量。

    2.1 在nodeManager的yarn-site.xml添加配置

        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle,spark_shuffle</value>
       </property>
       <property>
          <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
          <value>org.apache.spark.network.yarn.YarnShuffleService</value>
       </property>

    2.2 将spark-2.2.0-yarn-shuffle.jar文件拷贝到hadoop-yarn/lib目录下(即yarn的库目录)

    2.3 在Spark的spark-default.xml添加配置

    spark.dynamicAllocation.minExecutors 1 #最小Executor数
    spark.dynamicAllocation.maxExecutors 100 #最大Executor数

    通过这种配置,可以达到自动调整exector的目的。

    3.合理分配资源

    作为一个平台,其计算任务肯定不是固定的,有的数据量多,有的数据量少,因此需要合理分配资源,例如有些千万、亿级别的数据,分配20核计算资源就足够了。但是有些数据量级达到百亿的,就需要分配更多的计算资源了。参考第三章节的第3点。

    4.贴合业务需求

    计算的目的其实就是为了服务业务,业务的需求也理应是平台的追求,当业务产生合理需求时,平台方也应该尽量去满足。如为了支持业务高并发、高实时性查询的需求下,Spark在数据出库方式上,支持了Cmongo的出库方式。

    sc = SparkContext(conf=conf)
    
    sqlContext = SQLContext(sc)
    
    database = d = dict((l.split('=') for l in dbparameter.split()))
    
    parquetFile = sqlContext.read.parquet(file_name)
    
    parquetFile.registerTempTable(tempTable)
    
    result = sqlContext.sql(sparksql)
    
    url = "mongodb://"+database['user']+":"+database['password']+"@"+database['host']+":"+database['port']    result.write.format("com.mongodb.spark.sql").mode('overwrite').options(uri=url,database=database['dbname'],collection=pg_table_name).save()

    5.适用场景

    Spark作为通用的计算平台,在普通的应用的场景下,一般而言是不需要额外修改的,但是DataMagic平台上,我们需要“在前行中改变”。这里举个简单的场景,在日志分析中,日志的量级达到千亿/日的级别,当底层日志的某些字段出现utf-8编码都解析不了的时候,在Spark任务中进行计算会发生异常,然后失败,然而如果在数据落地之前对乱码数据进行过滤,则有可能会影响数据采集的效率,因此最终决定在Spark计算过程中解决中这个问题,因此在Spark计算时,对数据进行转换的代码处加上异常判断来解决该问题。

    6.Job问题定位

    Spark在计算任务失败时候,需要去定位失败原因,当Job失败是,可以通过yarn logs -applicationId application 来合并任务log,打开log,定位到Traceback,一般可以找到失败原因。一般而言,失败可以分成几类。

    a. 代码问题,写的Sql有语法问题,或者Spark代码有问题。

    b. Spark问题,旧Spark版本处理NULL值等。

    c. 任务长时间Running状态,则可能是数据倾斜问题。

    d. 任务内存越界问题。

    7.集群管理

    Spark集群在日常使用中,也是需要运营维护的,从而运营维护,发现其存在的问题,不断的对集群进行优化,这里从以下几个方面进行介绍,通过运营手段来保障集群的健壮性和稳定性,保证任务顺利执行。

    a. 定时查看是否有lost node和unhealthy node,可以通过脚本来定时设置告警,若存在,则需要进行定位处理。

    b. 定时扫描hdfs的运行log是否满了,需要定时删除过期log。

    c. 定时扫描集群资源是否满足计算任务使用,能够提前部署资源。

    五、总结

    本文主要是通过作者在搭建使用计算平台的过程中,写出对于Spark的理解,并且介绍了Spark在当前的DataMagic是如何使用的,当前平台已经用于架平离线分析,每天计算分析的数据量已经达到千亿~万亿级别。

    问答

    如何解决Apache Spark中的依赖关系问题?

    相关阅读

    技术分享 | Spark RDD详解

    Apache Spark快速入门

    spark streaming知识总结[优化]

     

    此文已由作者授权腾讯云+社区发布,原文链接:https://cloud.tencent.com/developer/article/1092587?fromSource=waitui

  • 相关阅读:
    EBS SQL > Form & Report
    oracle sql 优化分析点
    MRP 物料需求计划
    MRPII 制造资源计划
    Barcode128 应用实务
    Oracle SQL语句优化技术分析
    APPSQLAP10710 Online accounting could not be created. AP Invoice 无法创建会计分录
    Oracle数据完整性和锁机制
    ORACLE Responsibility Menu Reference to Other User
    EBS 常用 SQL
  • 原文地址:https://www.cnblogs.com/qcloud1001/p/8873093.html
Copyright © 2011-2022 走看看