zoukankan      html  css  js  c++  java
  • spark01

    spark得课程体系

    sparkcore:spark得核心

    sparksql底层使用得是sparkcoresql解析为core阶段得任务,进行执行

    spark-streaming定时执行sparkcore阶段得任务

    spark得安装集群模式

    spark任务得提交  spark-submit提交一个jar得任务  spark-shell交互式命令行

    RDD 弹性分布式数据集(scala得本地集合进行分布式存储和计算)

    RDD得特性,产生得原因,使用得方式,创建得方式,rdd上面的方法

    算子:rdd上面得方法

    spark原理,怎么运行得再源码曾进行分析

    DAG有向无环图

    spark得高级特性 依赖关系(宽窄依赖) 缓存  持久化

    checkpoint持久化 累加器  广播变量  spark on yarn

    spark HA集群

     

    sparksql通过spark建立数据仓库进行sql查询分析

    sparkStreaming:定时得执行spark任务保证实时性

    需要一个消息中间件 kafka

    mongodb  redis

     

     

     

    spark的特点

    搭建spark的集群

    提交spark的任务

    spark的运行机制

    spark wordcount

    spark的官网spark.apache.org

    spark运行速度比较快:因为使用内存

    mr存在昂贵的shuffle

    mr 只有两个算子 (map  reduce)*N

    mr每次计算的时候中间结果落地到磁盘中

    spark 算子比较多

    不需要落地中间结果到磁盘上

    spark支持多种语言

    spark DAG有向无环图 可以先生成图,然后将图进行切分整理,然后按照图进行优化执行

    spark存在容错 mr不存在容错,spark是一次性运行完毕的

    spark的部署模式

    spark部署模式分为四种

    local 本地模式,开箱即用

    standalonespark自带的模式,这个模式是最常用的模式

    yarn:资源管理框架

    mesos:和yarn一样

    spark任务的提交

    spark-submit 提交的是jar

    spark-shell 交互式命令行

    spark-shell中含有spark-submit的内容。其实spark-shell提交任务是使用的spark-submit

    spark-submit的使用:

    spark-submit [options] <app jar | python file> [app arguments]

    --master 提交任务到哪个集群中

    --class 运行jar包中的哪个类

    --name 运行的任务再监控页面中可以看到任务的名字

    spark-submit --master spark://linux01:7077 --class org.apache.spark.examples.SparkPi /root/Downloads/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar 10

    spark-submit --master local本地模式的单个cores local[N]本地模式的n个核数  local[*]

    executor执行其

    每个worker默认启动一个executor,每个executor默认使用1g内存所有的cores

    sparks-shell交互式命令行

    spark-shell --master spark://master02:7077 这个命令是再集群中开启一个长应用,但是不会运行任务,交互式命令行

    任务的监控页面是4040端口,任务运行的时候spark集群的上下文文件对象是sc实例,通过使用sc就可以和集群进行沟通会话

    spark版本的wordcount

    spark中读取文件的时候发现文件不存在

    因为spark是集群模式,sc.textFile(“”)读取文件的时候,每台机器都读取自身的文件

    为什么读取的时候不报错?

    读取的数据都是rddrdd上面的操作方法都叫做算子,算子分为两种 转换类的算子  行动类的算子

    将一个rdd转换为另一个rdd的算子就是转换类算子,是懒加载的

    行动类算子,一旦一个应用中遇见了行动类的算子,那么才会真正的执行

    transformation  action

    其实spark任务读取文件的时候都从hdfs中读取

    scala> sc.textFile("hdfs://master:9000/aaa.txt")

    res8: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/aaa.txt MapPartitionsRDD[8] at textFile at <console>:25

    scala> res8.flatMap(_.split(" "))

    res9: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:27

    scala> res9.map((_,1))

    res10: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:29

    scala> res10.groupBy(_._1)

    res11: org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[12] at groupBy at <console>:31

    scala> res11.mapValues(_.size)

    res12: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at mapValues at <console>:33

    scala> res12.collect

    res13: Array[(String, Int)] = Array((tom,1), (hello,4), (jerry,1), (rose,2), (jack,2))

    一般的算子都是转换类算子

    collect将数据从多个rdd中收集起来

    saveAsTextFile(“”)将数据保存到一个文件介质中

    foreach(println)遍历每一个元素,进行处理

    idea中的wordcount

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf()
      conf.setAppName("wc")
      conf.setMaster("local")
      val sc = new SparkContext(conf)
      val rdd1:RDD[String] =sc.textFile("hdfs://master:9000/aaa.txt")
      val rdd2:RDD[String] =rdd1.flatMap(_.split(" "))
      val rdd3:RDD[(String,Int)] = rdd2.map((_,1))
      val rdd4:RDD[(String,Iterable[(String,Int)])] = rdd3.groupBy(_._1)
      val rdd5:RDD[(String,Int)] = rdd4.mapValues(_.size)
      rdd5.saveAsTextFile("hdfs://master:9000/aaares")
    }

    运行内存不足的时候-Xmx512M增加运行内存

    spark-submit --master spark://linux01:7077 --class com.bw.spark.WordCount Spark1807-1.0-SNAPSHOT.jar hdfs://linux01:9000/aa/aa.txt hdfs://linux:9000/aaares1

    提交任务到集群中,如果已经有任务再运行了,那么就不存在足够的资源;需要等待资源分配

    再提交任务的时候查看spark的集群进程

    master02上面多了一个spark-submit的进程,这个进程主要是提交的进程,再什么位置提交任务就会多了一个spark-submit的进程

    CoarseGrainedExecutorBackEnd 简称为executor,每个worker都会生成一个executor的执行器

     

    Driver是一个应用的进程老大,管理所有的应用执行和切分以及分配,再standalone模式中driver就在client端,driver负责初始化工作,初始化sc,初始化很多运行时候需要的组件,再哪里提交的任务,哪里就是客户端,spark on yarn 这个driver就在集群中

    任务的资源分配

    worker的资源分配 一个worker再启动的时候,本台机器上的所有cores all memory-1G

    worker启动一个executor,一个executor占用的资源是多少?

    executor默认使用的cores是所有的核数  memory1G

    再运行spark-shell脚本的时候,运行spark-submit提交的任务,这个任务没有资源分配,所以一直处于等待状态

     

    这就是资源占用的提示

    自定义资源分配

    cores  memory

    all cores  and 1G  executor占用worker的资源分配

    spark-shell  spark-submit

    8cores  1G

    spark-shell  3 executor  4cores  512M

    spark-submit  3 executor  4cores  512M

    以上资源分配就可以形成两个应用并行运行

    --executor-cores executor再启动的时候使用的cores核数

    --executor-memory executor再启动的时候需要的内存

    --total-executor-cores 所有的executor再启动的时候需要的总的cores

    spark-shell --master spark://linux01:7077 --executor-cores 4 --executor-memory 512M

    spark-shell --master spark://linux01:7077 --executor-cores 4 --executor-memory 512M --total-executor-cores 12

  • 相关阅读:
    契约测试SpringCloud Contract入门
    CircuitBreaker 组件 resilience4j
    阿里开源的15个顶级Java项目
    将军令:数据安全平台建设实践
    ResNet
    设计模式
    muduo评测摘要
    muduo 学习
    RAII
    大数据框架
  • 原文地址:https://www.cnblogs.com/wxk161640207382/p/11309011.html
Copyright © 2011-2022 走看看