zoukankan      html  css  js  c++  java
  • Spark使用记录

    一 ,错误:

    spark报:too many open files 打开文件过多的意思

    ​ ulimit -a 查看打开的连接限制 ulimit -n 4096 扩大最大允许打开的文件数量设置为4096(临时的,重启后会还原)

    修改系统配置文件(重启生效):vim /etc/security//limits.conf
    末尾增加:root soft nofile 4096
    root gard nofile 4096

    二,spark英文解释:

    Scheduler delay : 程序调度延迟

    Executor Computing Time: 执行器计算时间

    Getting Result Time : 获取结果时间

    Task Deserialization Time : 任务反序列化时间

    Shuffle Write Time : 无序写入时间

    Shuffle Read Time : 无序读取时间

    Result Serialization Time : 结果序列化时间

    /opt/module/hadoop-2.7.2/start-balancer.sh : 数据平衡

    Locality Level :

    ​ 这几个值在图中代表task的计算节点和task的输入数据的节点位置关系

    PROCESS_LOCAL :数据在同一个JVM中,既在同一个executor上。

    NODE_LOCAL :数据在同一个节点上。比如数据在同一节点的另一个executor上;或者在HDFS上,恰好有block在同一个节点上,速度比PROCESS_LOCAL稍慢,因为数据需要在不同的进程之间传递或者从文件中读取。

    NO_LOCAL :数据从哪里访问都一样快,不需要位置优先。

    RACK_LOCAL :数据在同一机架的不同节点上,需要通过网络传输数据及文件IO,比NODE_LOCAL慢

    ANY :数据在非同一机架上的网络上,速度最慢

    三.Spark UI的端口号:

    ​ spark的默认端口号是4040,而被占用了就会顺序取+1的端口,当开了多个spark程序后,当端口加到了4045,在chrome浏览器里面就打不开了。会返回一个UNSAFE PORT 的错误信息,其实这是浏览器禁用了你访问的端口,程序实际是正常运行的。

    ​ 所以建议手工指定spark.ui.port=4046

    ​ 在spark-defaults.conf的配置文件中,若是4046被占用了,就从4046开始后+1,跳过4045这个端口,避免看不到spark ui界面的困扰。

    解决办法:在启动这个任务的时候带上 --conf spark.ui.port=4046

    . /etc/profile
    nohup /opt/module/spark/bin/spark-submit 
    --class com.lg.bigdata.streaming.KafkaAndJsonGJTS 
    --queue default 
    --master yarn   
    --deploy-mode client  
    --conf spark.ui.port=4046 
    /opt/module/spark/WordJzw5-jar-with-dependencies.jar > /opt/module/spark/log-KafkaAndJsonGJTS 2>&1

    http://hadoop102:4047/jobs/

    四.缓存:

    如何清除缓存
    1. 如果编写代码思路比较清晰的话,可以很清楚地记得哪个rdd或者dataSet进行了缓存的操作。可以直接调用 unpersist操作一个简单的例子如下:

    //  假设注册了一张teacher表,dataFrame读取了 这张表
    val data:DataFrame = spark.sql("SELECT * FROM teacher")
    // 程序开始进行缓存cache(默认存储到内存当中),然后调用action算子触发程序执行
    data.cache.show()
    // 调用下方代码,可以清除掉刚才得到的缓存
    data.unpersist()
    123456

      如果思路不够清晰,或者程序比较长,写着写着就忘记了哪些数据进行缓存过了,这里提供一个清除所有缓存在spark环境里面的数据的操作:如下所示:(写这篇水文的目的

      val ds: collection.Map[Int, RDD[_]] = spark.sparkContext.getPersistentRDDs
        ds.foreach(x => {
          x._2.unpersist()
        })
    1234

    执行的原理大致如下:spark.sparkContext的下文中通过调用getPersistentRDDs的方法,可以得到缓存区域里面所有的数据缓存信息。然后返回一个集合,通过循环遍历这个集合,调用unpersist的方法,便可以将这个缓冲区域里面的所有数据清空!

    常用命令:

    .getNumPartitions/.partitions.size: 查看分区数

    .partitioner: 查看RDD的分区器

    // 调用下方代码,可以清除掉刚才得到的缓存 data.unpersist()

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    url路由配置及渲染方式
    django类视图介绍与类视图装饰器
    什么是数据类型
    python代码的编写和运行
    python环境搭建
    python教程(目录)
    编程语言概念
    面向对象入门
    编程语言
    Tornado框架实现图形验证码功能
  • 原文地址:https://www.cnblogs.com/KdeS/p/14685108.html
Copyright © 2011-2022 走看看