zoukankan      html  css  js  c++  java
  • spark 基础

    scala版 ,基本名词概念及 rdd的基本创建及使用

    var conf = new SparkConf()

    var sc: SparkContext = new SparkContext(conf)

    val rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3) 

               

    # sc.parallelize(,3)  将数据并行加载到三台机器上

    var tmpRDDA1 = rawRDDA.flatMap(line=>line.split(" "))

    var tmpRDDA2 = tmpRDDA1.filter(allWord=>{allWord.contains("aa") || allWord.contains("bb")})

    var tmpRDDA3 = tmpRDDA2.map(word=>(word,1))

    import org.apache.spark.HashPartitioner

    var tmpRDDA4 = tmpRDDA.partitionBy(new HashPartitioner(2)).groupByKey()

    #partitionBy(new HashPartitioner(2)).groupByKey  将之前的3台机器Shuffle成两台机器

    var tmpResultRDDA = tmpRDDA4.map((P:(String,Iterable[Int]))=>(P._1,P._2.sum))

    #对相同的key的value进行求和

    Partition :某机上一个固定数据块 , 一系列相关Partition组合为一个RDD  。

                    如tmpRDDA2拥有3个Partition ,而 tmpResultRDDA拥有两个Partition

    RDD :数据统一操作所在地, 代码中任意一个操作(如faltMap,filter,map), RDD内的所有Partition都会执行

              如在rawRDDA->tmpRDDA1时 ,执行flatMap(line=>line.split(" ")),则rawRDD 的三个Partition (分别为 cslave0上的“!! bb ## cc”,

              cslave1上的“-- cc bb $$”和cslave2上的“cc ^^ ++ aa”都要执行flatMap操作)

    RDD 是数据并行化所在地 ,隶属于某RDD的所有Partition都要执行相同操作,当这些Partition存在于不同机器,就会由不同机器同时执 

            行,也就是并行执行

    RDD并行化范式主要有Map和Shuffle

            Map 范式 :只对本Partition上的数据进行操作, 操作的数据对象不跨越多个Partition,即不跨越网络 。

            Shuffle范式 : 对不同Partition上的数据进行重组,其操作的数据对象跨越多个甚至是所有Partition ,即跨越网络

    场景 :多输入源

    两个原始文件rawFile1 和 rawFile2,要求将rawFile1的内容均匀加载到cslave3,cslave4上,接着对rawFile1进行数据去重,

    要求将rawFile2加载到cslave5,然后将rawFile1的处理结果中 去掉rawFile2中所含的条目 

    var conf = new SparkConf()

    var sc: SparkContext = new SparkContext(conf)

    var rawRDDB = sc.parallelize(List(("xx",99),("yy",88),("xx",99),("zz",99)),2)

    var rawRDDC = sc.parallelize(List(("yy",88)),1)

    var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)

    subtract()就是两个RDD相减,而这两个RDD来自不同的输入文件

           

    场景:复杂情况

    初始化多个rdd,相互取并集或差集

    多输入源,去重,装换,再合并

    var conf = new SparkConf()

    var sc:SparkContext = new SparkContext(conf)

    var rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3)

    var rawRDDB = sc.paralleliz(List(("xx,99),("yy",88),("xx",99),("zz",99)),2)

    var rawRDDC = sc.parallelize(List(("yy",88)),1)

    import org.apache.spark.HashPartitioner

    var tmpResultRDDA = rawRDDA.flatMap(line=>line.split(" ")).filter(allWord=>{allWord.contains("aa")||allWord.contains("bb")}).map(word=>(word,1)).partitionBy(new HashPartitioner(2)).groupByKey().map((P:String,Iterable[Int]))=>(P._1,P._2.sum))

    var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)

    var resultRDDABC = tmpResultRDDA.union(tmpResultRDDBC)

    resultRDDABC.saveAsTextFile("HDFS路径")

    map范式作用于RDD时,不会改变前后两个RDD内Partition数量, 当partitionBy,union作用于RDD时,会改变前后两个RDD内Partition数量

    RDD持久化到HDFS时,RDD对应一个文件夹,属于该RDD的每个Partition对应一个独立文件

    RDD之间的中间数据不存入本地磁盘或HDFS

    RDD的多个操作可以用点‘.’连接,如 RDD1.map().filter().groupBy()

    RDD可以对指定的某个Partition进行操作,而不更改其他Partition

    Spark-app执行流程:
    1.用户调用RDD API接口,编写rdd转换应用代码

    2.使用spark提交job到Master

    3.Master收到job,通知各个Worker启动Executor

    4.各个Executor向Driver注册 (用户编写的代码和提交任务的客户端统一称Driver)

    5.RDD Graph将用户的RDD串组织成DAG-RDD

    6.DAGSchedule 以Shuffle为原则(即遇Shuffle就拆分)将DAG-RDD拆分成一系列StageDAG-RDD(StageDAG-RDD0->StageDAG-RDD1->StageDAG-rdd2->...)

    7.RDD通过访问NameNode,将DataNode上的数据块装入RDD的Partition

    8.TaskSchedule将StageDAG-RDD0发往隶属于本RDD的所有Partition执行,在Partition执行过程中,Partition上的Executor优先执行本Partition.

    9.TaskSchedule将StageDAG-RDD1发往隶属于本RDD(已改变)的所有Partition执行

    10.重复上面8,9步的步骤,直至执行完所有Stage-DAG-RDD

    资源隔离性

    每个执行的Spark-APP都有自己一系列的Executor进程(分布在不同的机器上或内核上),这些Executor会协作完成该任务。

    单个Executor会以多线程复用方式运行该Spark-APP分配来的所有Task .

    一个Executor只属于一个Spark-APP,一个Spark-APP可以有多个Executor

    这与MapReduce不同。  比如某个由Map->Reduce1->Reduce2构成的ML-App,有十个Slave同时执行该任务,从某一个slave机器上来看,

    MapReduce框架执行时会启动Map进程,Reduce1进程,Reduce2进程,三个进程顺序执行该任务

    而Spark则使用一个Executor进程完成这四个操作。

    spark-APP本身感知不到集群的存在

  • 相关阅读:
    elasticsearch ——id字段说明,内部是_uid
    企业安全建设之搭建开源SIEM平台(上)
    江西鹰潭、江西移动与华为战略合作:共推物联网——物联网的世界要到来了
    Luke 5—— 可视化 Lucene 索引查看工具,可以查看ES的索引
    Apache Flink vs Apache Spark——感觉二者是互相抄袭啊 看谁的好就抄过来 Flink支持在runtime中的有环数据流,这样表示机器学习算法更有效而且更有效率
    转:shell比较两个字符串是否相等
    UNIX 缩写风格
    转:.Net程序员学习Linux最简单的方法
    asp.net插入sql server 中文乱码问题解决方案
    asp.net将object或string转为int
  • 原文地址:https://www.cnblogs.com/Ting-light/p/11103989.html
Copyright © 2011-2022 走看看