zoukankan      html  css  js  c++  java
  • spark core (二)

    一、Spark-Shell交互式工具

         1、Spark-Shell交互式工具

            Spark-Shell提供了一种学习API的简单方式, 以及一个能够交互式分析数据的强大工具。

            Scala语言环境下或Python语言环境下均可使用。

            启动Spark-Shell
                    ./bin/spark-shell      本地模式,线程数为1(1个CPU)
                    ./bin/spark-shell --master
                    ./bin/spark-shell --master local[2] //使用2CPU核运行
                    ./bin/spark-shell --master local[2] --jar testcode.jar //指定Jar包路径

                       ./bin/spark-shell --master  spark://liuwei3:7077 

            Spark-Shell中已经创建了一个名为scSparkContext对象

            --master用来设置context将要连接并使用的资源主节点, master的值可以是Standalone模式 的Spark集群地址、 MesosYarn集群的URL, 或者是一个local地址。 

           使用--jar可以添加Jar包的路径, 使用逗号分隔可以添加多个包

           Spark-Shell的本质是在后台调用了spark-submit脚本来启动应用程序。

         2、Spark-Shell操作案例

             收集到数据后, 需要对数据进行各种数据处理,  包括数据的抽取-转换-装载( Extract-Transform-Load, ETL) 、 数据统计、 数据挖掘, 以 及为后续的数据呈现和为决策而提供的数据持久化。 

             本案例数据处理过程, 包含了对外部文件的加载, 对文件数据的转换、 过滤、 各种数据统计, 以及处理结果的存储。 

             加载文件 :

                      Spark创建sc之后, 可以加载本地文件创建RDD, 返回一个MapPartitionsRDD
                                val textFile=sc.textFile(“file:///*****/README.md”)
                       加载HDFS文件和本地文件都是使用textFile( ), 区别是添加前缀进行标识, hdfs://file:///
                       从本地读取的文件返回MapPartitionsRDD
                       HDFS读取的文件先转成HadoopRDD, 然后隐式转换成MapPartitonsRDD
                       MapPartitionsRDDHadoopRDD都是基于Spark的弹性分布式数据集RDD

           执行Transformation操作返回新RDD:
                       texfFile.first() //获取RDD文件的第一行
                       textFile.count() //获取RDD文件的行数
                       val textRDD=textFile.filter(line=>line.contains(“Spark”)) //过滤出包含Spark的行, 并返回新RDD
                       textFile.filter(line=>line.contains(“Spark”)).count() //链接多个TransformationAction操作

           找出文本中每行最多的单词数
                       textFile.map(line=>line.split(“ ”).size).reduce((a,b)=>if(a>b) a else b)
                       首先将textFile的每一行文本使用split(“ ”)进行分词, 并统计分词后的单词数, 然后执行reduce操作, 使 用(a,b)=>if(a>b) a else b 进行比较, 返回最大值 

           词频统计
                       val wordCount=textFile.flatMap(line=>line.split(“ ”)).map(word=>(word,1)).reduceByKey((x,y)=>x+y)
                       wordCount.collect()
                       结合flatMapMapreduceByKey来计算文件中每个单词的词频, 并返回(String,Int)类型的 键值对ShuffleRDD, 最后使用collect()聚合单词计数结果。

                      如果想让代码更简洁, 可以使用占位符”_”
                       当每个参数在函数文本中最多出现一次的时候, 可以使用下划线_+_扩展成带两个参数的函数文本。
                       多个下划线指代多个参数, 而不是单个参数的重复使用。 第一个下划线代表第一个参数, 第二个下划线 代表第二个参数。
                       val wordCount=textFile.flatMap(_.split(“ ”)).map((_,1)).reduceByKey(_+_)
                       Spark默认是不进行排序的, 可以使用sortByKey按照Key进行排序, false为降序, true为升序。
                       val wordCount=textFile.flatMap(_.split(“ ”)).map((_,1)).reduceByKey(_+_).map(m=>(m._2,m._1)).sortByKey(false).map(m=>(m._2,m._1))
                       其中, m=>(m._2,m._1)实现keyvalue互换。
                       wordCount.saveAsTextFile(“hdfs://tgmaster:9000/out/wordcount”) //保存文件

          RDD缓存(内存持久化)
                       Spark支持将数据集存进内存缓存中, 当数据被反复访问时, 是非常有用的。
                       textFile.cache() //通过cache缓存数据可用于非常大的数据集, 支持跨越几十或几百个节点。    或者textFile.persist()     清除缓存:textFile.unpersist()
                       textFile.count()

        .collect()作用:将计算结果从集群中获取到本地内存来显示,容易发生OOM(内存溢出)所以collect只适合结果数据量较小的情况,如果计算结果数据量很大,此时要用foreach()输出

    2、操作案例
    //加载文件
    val lines=sc.textFile("file:///home/tg/datas/ws")
    val lines2=sc.textFile("hdfs://tgmaster:9000/in/ws")
    //统计文件的行数
    lines.count
    //查询文件第一行的数据
    lines.first
    //过滤出包含"spark"的行
    val result1=lines.filter(line=>line.contains("spark"))
    //包含"spark"关键词的行数
    result1.count
    //统计出每行最多的单词数
    lines.map(line=>line.split(" ").size)
    .reduce((x,y)=> if (x>y) x else y)
    
    //单词统计计数(WordCount),根据次数进行降序排列
    vla result=lines.flatMap(x=>x.split(" "))  //对单词进行分隔
    .map(x=>(x,1)) //将分隔后的单词进行统计,形成键值对(word,1)
    .reduceByKey((x,y)=>x+y) //对key进行分组,然后统计Value值
    ====方法二=======
    .sortBy(_._2,false)
    
    ====方法一=====
    .map(x=>(x._2,x._1)) //(次数,单词)
    .sortByKey(false)  //默认true升序,false降序
    .map(x=>(x._2,x._1))  //(单词,次数)
    
    //将计算结果从集群中获取到本地内存来显示,
    //容易发生OOM(内存溢出),所以collect只适合结果数据量较小的情况,
    //如果计算结果数据量很大,此时要用foreach()输出
    .collect  
    //将最终的单词统计结果保存到HDFS上
    result.saveAsTextFile("hdfs://tgmaster:9000/out/wsresult")
    
    3、RDD缓存(内存持久化)
    result.cache() //缓存到内存
    result.persist()//同上
    result.unpersist()//清除缓存

    二、SparkContext与部署模式

            1、 SparkContext
               SparkContextSpark上下文
               独立应用程序需要初始化一个SparkContext作为程序的一部分, 然后将一个包含应 用程序信息的SparkConf对象传递给SparkContext构造函数。
               创建SparkContext对象:
       

    scala方式:                

    def main(args: Array[String]): Unit = {
        //第一步:创建SparkConf对象
        val conf=new SparkConf().setAppName("WordCount")
                    .setMaster("local")
        //第二步:创建SparkContext对象
        val sc=new SparkContext(conf)
        //第三步:可以从linux或者HDFS中获取数据
        val lines=sc.textFile("hdfs://tgmaster:9000/in/resws")
        //第四大步:进行单词统计计数
        val result=lines.flatMap(_.split(" "))
                      .map((_,1)).reduceByKey(_+_)
                      .sortBy(_._2,false)
        //第五步:将计算结果保存到HDFS中
    //    result.saveAsTextFile("hdfs://liuwei1:9000/out/res2")
    
    //    result.collect()
        //通过foreach算子输出单词与次数
        result.foreach(x=>println(x._1+" "+x._2))
      }
    

    JAVA方式:

    setMaster()用于设置运行的master上的URL,如果程序在本地运行,需要设置为local或local[N](N>2),如果程序需要打包运行在集群中,那么在代码中就不需要设置setMaster()

    SparkContext,即为Spark上下文,它包含在Driver驱动程序中,Spark程序的运行离不开SparkContext

    Scala开发spark对应的是sparkcontext;Java开发spark对应的是Javasparkcontext

    创建sparkcontext对象,需要sparkConf对象作为参数 

    package day1;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    /**
     * 用Java开发Spark的WordCount
     * @author Administrator
     *
     */
    public class WordCount {
    	public static void main(String[] args) {
    		//第一步:创建SparkConf对象
    		SparkConf conf=new SparkConf().setAppName("WordCount")
    				.setMaster("local");
    		//第二步:创建JavaSparkContext对象
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		//第三步:从HDFS中加载数据,生成JavaRDD<String>    
    从HDFS等外部数据源创建程序中第一个RDD,即为初始RDD。
    创建初始RDD有两种方式:通过HDFS等外部数据源中创建;通过并行集合的方式创建(例:val ) JavaRDD<String> lines = sc.textFile("hdfs://tgmaster:9000/in/resws"); //第四步:分隔单词
    flatmap:先map,后flatten
    flatMap算子中,通常有一个String类型的参数,同时具有集合Iterable<T>类型的返回值 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; public Iterable<String> call(String line) throws Exception { // TODO Auto-generated method stub return Arrays.asList(line.split(" ")); } }); //第五步:将分隔后的每个单词出现次数记录为1,形成键值对<单词,1>
    用JAVA开发Spark,如果需要通过映射的方式产生键值对,此时要用到mapToPair算子,这一点与scala开发spark不一样,在scala中只有map算子,没有maoToPair算子。
    mapToPair 算子中有一个String类型 JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); } }); //第六步:通过reduceByKey()算子统计每个单词出现的次数
    reduceBykey 这个算子很重要,它有两个Integer类型的参数,返回值是Integer
    它的运行过程是,先在本地按照key值进行聚合,然后再全局范围再按照key值进行聚合,他的性能比groupBykey强很多 JavaPairRDD<String, Integer> result = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Integer call(Integer num1, Integer num2) throws Exception { // TODO Auto-generated method stub return num1+num2; } }); //第七步:通过foreach()算子输出最终结果 result.foreach(new VoidFunction<Tuple2<String,Integer>>() { /** * */ private static final long serialVersionUID = 1L; public void call(Tuple2<String, Integer> info) throws Exception { // TODO Auto-generated method stub System.out.println(info._1+"出现了"+info._2); } }); } }

       2、Spark部署模式

               查看本地电脑cpu是几核,通过任务管理器,性能,CPU使用记录有几个框就是几核

              Local本地模式: 运行于本地
                                 spark-shell --master local[2]
                                 local[2]是说, 执行Application需要用到CPU2个核

              Standalone独立模式: Spark自带的一种集群模式
                                 Spark自己管理集群资源, 此时只需要将HadoopHDFS启动,(不需要启动yarn)
                                 Master节点有master,Slave节点上有worker
                                 启动 ./bin/spark-shell --master spark://master:7077

              YARN模式
                                 Spark自己不管理资源, 向YARN申请资源, 所以必须保证YARN是启动起来的
                                   操作步骤:
                                     启动Hadoop:Hadoop目录执行start-all.sh
                                     Spark目录, ./bin/spark-shell --master yarn-clientyarn-cluster

                                                           yarn-client运行在测试环境   yarn-cluster运行在真实环境

             以上三种都是集群模式

             IDEA中生成Jar包, 使用IDEA编译class文件, 同时将class打包成Jar文件。
                      File——Project Structure, 弹性“Project Structure”的设置对话框
                      选择左边的Artifacts, 点击上方的“+”按钮
                      在弹出的对话框中选择“Jar”——“from moduls with dependencies”
                      选择要启动的类, 然后确定   (删除SparkApps.jar下的所有jar包)
                      应用之后选择菜单“Build”——“Build Artifacts”, 选择“Build”“Rebuild”即可生成

                           如果重新打包的话,删除src下的META-INF
             Spark提交Job:

                       ./spark-submit

                       --class newScala.Test

                      --master yarn-cluster  

                      /home/hadoop/Test.jar      (剩下的可以写,也可以不写)

     

          3、Spark部署模式—Master URL格式及说明  

               

                yarn-client运行在客户端本地,yarn-cluster运行在ApplicationMaster所在的集群节点中

    三、RDD弹性分布式数据集

            1、RDD弹性分布式数据集    (RDD既可以放在内存中,也可以放在磁盘中,优先放在内存中,内存放不下,再考虑放在磁盘中;分区分布在很多节点上;partition真正存放数据的地方,partition在内存中)(默认情况下,文件在HDFS上有几个block块,初始RDD就有几个分区)

                RDDSpark提供的核心抽象, 全称为Resillient Distributed Dataset, 即弹性分布式数 据集。

                RDD在抽象上来说是一种元素集合, 包含了数据。 它是被分区的, 分为多个分区。 每个分区分布在集群中的不同节点上, 从而让RDD中的数据可以被并行操作。 ( 分布 式数据集) 

                RDD通常通过Hadoop上的文件, 即HDFS文件或者Hive表, 来进行创建, 也可以通过 应用程序中的集合来创建。 

                RDD最重要的特性就是, 提供了容错性, 可以自动从节点失败中恢复过来。 如果某个 节点上的RDD partition, 因为节点故障, 导致数据丢了, 那么RDD会自动通过 数据来源重新计算该partition

                           Spark的容错机制:1、lineage(血统) 2、checkpoint(检查点)
                RDD的数据默认情况下存放在内存中的, 但是在内存资源不足时, Spark会自动将 RDD数据写入磁盘。 ( 弹性) 

             2、RDD的定义
                    一个RDD对象, 包含如下5个核心属性:
                      一个分区列表, 每个分区里是RDD的一部分数据( 或称数据块) ;(每个分区保持相对均衡,不是绝地均衡)
                      一个依赖列表, 存储依赖的其他RDD;(RDDA经过算子产生RDDB,RDDB依赖RDDA)
                      一个名为computer的计算函数, 用于计算RDD各分区的值;
                      分区器( 可选) , 用于键/值类型的RDD, 比如某个RDD是按散列来分区;
                      计算各分区时优先的位置列表( 可选) , 比如从HDFS上的文件生成RDD时, RDD分区的位置优先选择数据所在的节点, 这样可以避免数据移动带来的开销。 
                   
     前三个是必须 ,后两个是可选的

     四、创建RDD

            1、创建RDD

                 进行Spark核心编程时, 首先要做的第一件事, 就是创建一个初始的RDD
                       该RDD中, 通常包含了Spark应用程序的输入源数据。
                       在创建了初始的RDD之后, 才可以通过Spark Core提供的transformation算子, 对该RDD进 行转换获取其他的RDD。 

                  Spark Core提供了三种创建RDD的方式, 包括:
                           使用程序中的集合创建RDD
                           使用本地文件创建RDD或使用HDFS文件创建RDD
                           RDD通过算子形成新的RDD

                           (前两种创建初始RDD)

                  经验总结:
                         使用程序中的集合创建RDD, 主要用于进行测试, 可以在实际部署到集群运行之前, 自 己使用集合构造测试数据, 来测试后面的Spark应用的流程。
                         使用本地文件创建RDD, 主要用于临时性地处理一些存储了大量数据的文件。
                         使用HDFS文件创建RDD, 应该是最常用的生产环境处理方式, 主要可以针对HDFS上 存储的大数据, 进行离线批处理操作。 

             2、并行化集合创建RDD

                 如果要通过并行化集合来创建RDD, 需要针对程序中的集合, 调用SparkContextparallelize()方法。 Spark会将集合中的数据拷贝到集群上去, 形成一个分布式的数据集合, 也就是一个RDD
                 相当于集合中的一部分数据会到一个节点上, 而另一部分数据会到其他节点上, 然后就可以用并行的方 式来操作这个分布式数据集合, 即RDD。 

                        案例: 110累加求和
                               val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                               val rdd = sc.parallelize(arr)                
                               val sum = rdd.reduce(_ + _)

                 调用parallelize()时, 有一个重要的参数可以指定, 就是要将集合切分成多少个partition。 Spark会为每一个partition运行一个task来进行处理。
                 Spark默认会根据集群的情况来设置partition的数量, 但是也可以在调用parallelize()方法时, 传入第二个 参数, 来设置RDDpartition数量。 比如parallelize(arr, 10) 

              3、使用本地文件和HDFS创建RDD

                  Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的, 比如说 HDFS、 Cassandra、 HBase以及本地文件。 

                  通过调用SparkContexttextFile()方法, 可以针对本地文件或HDFS文件创建 RDD。 
                  注意:
                            SparktextFile()方法支持针对目录、 压缩文件以及通配符进行RDD创建。
                            Spark默认会为hdfs文件的每一个block创建一个partition, 但是也可以通过textFile() 的第二个参数手动设置分区数量, 只能比block数量多, 不能比block数量少。 

                  案例: 文件字数统计
                             val rdd = sc.textFile("data.txt")
                             val wordCount = rdd.map(line => line.length).reduce(_ + _)

                  SparktextFile()除了可以针对上述几种普通的文件创建RDD之外, 还有一些特列 的方法来创建RDD: 

                         SparkContext.wholeTextFiles()方法, 可以针对一个目录中的大量小文件, 返 回<filename, fileContent>组成的pair, 作为一个PairRDD, 而不是普通的RDD
                              普通的textFile()返回的RDD中, 每个元素就是文件中的一行文本。
                          SparkContext.sequenceFile[K, V]()方法, 可以针对SequenceFile创建RDD, KV泛型类型就是SequenceFilekeyvalue的类型。 KV要求必须是
                             Hadoop的序列化类型, 比如IntWritableText等。
                          SparkContext.hadoopRDD()方法, 对于Hadoop的自定义输入类型, 可以创建 RDD。 该方法接收JobConf、 InputFormatClass、 KeyValueClass
                          SparkContext.objectFile()方法, 可以针对之前调用RDD.saveAsObjectFile() 创建的对象序列化的文件, 反序列化文件中的数据, 并创建一个RDD。 
                 

    1、并行化集合创建RDD
    val array=Array(1,2,3,4,5,6)
    //没有指定rdd的分区,此时默认为程序所分配的资源的CPU核数
    val rdd=sc.parallelize(array)
    //指定rdd2中有3个分区
    val rdd2=sc.parallelize(array,3)
    2、从外部数据源创建RDD
    //此时没有指定分区,
    //那么分区数就等于文件在HDFS上存储的Block数。
    val lines=sc.textFile("hdfs路径")
    //指定了lines中有2个分区
    //设置的分区数,只能比Block多,不能比它少。
    val lines=sc.textFile("hdfs路径",2)
    3、通过Transformation(转换)类型的算子产生新的RDD
    val result=lines.flatMap(_.split(" "))
    					.map((_,1))
    					.reduceByKey(_+_)
    但是下面两个算子是Action(行动)类型。
    result.collect
    result.foreach(println) 

    五、RDD操作

              RDD操作:创建操作、 转换操作(transformation)、 行动操作 (Action)、 控制操作(缓存到内存,也称内存持久化)

              在spark中transformation类型的算子,系统只是记录下了这个操作行为,但这个行为并没有被执行。当程序遇到一个action类型的算子时,会触发Job的提交运行, 此时Action算子之前所有

    transformation类型的算子就会被执行。  (saveAsTextFile、reduce是Action,reduceByKey是transformation,之前的textfile map sortBy等等都是transformation)

               1、transformation和action介绍

                     Spark支持两种RDD操作: transformationaction
                     transformation操作会针对已有的RDD创建一个新的RDD

                     action则主要是对RDD进行最后的操作, 比如遍历、 reduce、 保存到文件等, 并可以返回结果给Driver程序。

                     例如:
                          map就是一种transformation操作, 它用于将已有RDD的每个元素传入一个自定义的函数, 并获取一个新的元素, 然后将所有的新元素组成一个新的RDD
                          reduce就是一种action操作, 它用于对RDD中的所有元素进行聚合操作, 并获取一个最终的结果, 然后返回给Driver 程序。 

                     transformation的特点就是lazy特性。

                     lazy特性指的是, 如果一个spark应用中只定义了transformation操作, transformation是不会触发Spark程序的执行的,它们只是记录了对RDD所做的操作, 但是不会自发的执行。 

                     只有当执行了一个action操作之后, 所有的transformation才会执行。
                     Spark通过这种lazy特性来进行底层的Spark应用执行的优化, 避免产生过多中间结果。
                     action操作执行, 会触发一个spark job的运行, 从而触发这个action之前所有的transformation的执行。
                     这是action的特性。

               2、案例:统计文件字数

                        // 这里通过textFile()方法, 针对外部文件创建了一个RDD lines, 但是实际上, 程序执行到这里为止, spark.txt文件的数据是不会加载到内存中的。 lines, 只是代表了一个指向

    spark.txt文件的引用。   val lines = sc.textFile("spark.txt") 

                         // 这里对lines RDD进行了map算子, 获取了一个转换后的lineLengths RDD, 但是这里连数据都没有, 当然
    也不会做任何操作。 lineLengths RDD也只是一个概念上的东西而已。

                            val lineLengths = lines.map(line => line.length)

                         // 之后, 执行了一个action操作, reduce。 此时就会触发之前所有transformation操作的执行, Spark会将操 作拆分成多个task到多个机器上并行执行, 每个task会在本地执行map操作,

    并且进行本地的reduce聚合。 最后会进行一个全局的reduce聚合, 然后将结果返回给Driver程序。 val totalLength = lineLengths.reduce(_ + _) 

                3、常用transformation介绍

                        

                       reduceByKey:现在本地进行聚合,再全局聚合,性能比 groupBykey好

                       groupByKey:全局聚合           

                  4、常用action介绍

                      

    Spark运行原理(初级版):
    DAG有向无环图——DAGScheduler,Stage阶段(stage划分算法)
    ——TaskSet——TaskScheduler,
    把task任务(task分配算法)发送到Work节点中的Executor去执行
    ---Executor接收到task之后,会从线程池中取出相应的线程去执行接收到的task任务

      

    补充:

             1、hadoop 1.x:

                    HDFS:namenode(1个) 、datanode(多个)

                    MapReduce:

                              JobTracker(1个):与用户通信,接收用户提交的Application;将Application划分为很多的任务(Task),分配多个TaskTracker去执行;管理调度集群中的资源

                              TaskTracker(多个):执行JobTracker分配的任务

                   hadoop1.x缺陷:单点故障、

                   hadoop 2.x:

                         HDFS:提出HA高可靠概念 (两个namenode,一个负责读,一个负责写)

                         MapReduce

                         Yarn:管理调度集群中的资源

  • 相关阅读:
    UVa-10317
    UVa-1595
    UVa-10391
    UVa-10763
    UVa-10935
    UVa-1594
    UVa-1593
    从CSDN搬过来了
    memset会显著增加时间和空间的消耗吗
    memset对数组的初始化
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6590276.html
Copyright © 2011-2022 走看看