zoukankan      html  css  js  c++  java
  • 学习笔记--Spark

    参考来源:http://www.yiibai.com/spark/

    概述
    Apache Spark是一个集群计算设计的快速计算。它是建立在Hadoop MapReduce之上,它扩展了 MapReduce 模式,有效地使用更多类型的计算,其中包括交互式查询和流处理。Spark的主要特征是其内存集群计算,增加的应用程序的处理速度。

    三种部署方法:

    • 单机版 − Spark独立部署是指Spark占据在HDFS之上(Hadoop分布式文件系统)并将空间分配给HDFS。在这里,Spark和MapReduce将并列覆盖所有Spark的作业集群。
    • Hadoop Yarn − Hadoop Yarn部署方式,简单地说,spark运行在Yarn没有任何必要预安装或使用root访问权限。它有助于Spark融入Hadoop生态系统和Hadoop堆栈。它允许在其它部件叠上层的顶部上运行。
    • Spark 在MapReduce (SIMR) − Spark在MapReduce的用于启动spark作业,除了独立部署。通过SIMR,用户可以启动Spark和使用Shell,而不需要任何管理权限。

    Spark RDD
    弹性分布式数据集(RDD)是Spark的基本数据结构。它是对象的不可变的分布式集合。在RDD中每个数据集被划分成逻辑分区,这可能是在群集中的不同节点上计算的。RDDS可以包含任何类型,如:Python,Java,或者Scala的对象,包括用户定义的类。

    安装
    按顺序安装Java、Scala、Spark

    Spark核心编程
    创建简单RDD
    Spark容器会自动创建Spark 上下文对象名为sc

    $ spark-shell
    scala> val inputfile = sc.textFile(“input.txt”)
    

    RDD转换

    S.No 转换&含义
    1 map(func) 返回一个新的分布式数据集,传递源的每个元素形成通过一个函数 func
    2 filter(func) 返回由选择在func返回true,源元素组成了一个新的数据集
    3 flatMap(func) 类似映射,但每个输入项目可以被映射到0以上输出项目(所以func应返回seq而不是单一的项目)
    4 mapPartitions(func) 类似映射,只不过是单独的每个分区(块)上运行RDD,因此 func 的类型必须是Iterator ⇒ Iterator 对类型T在RDD上运行时
    5 mapPartitionsWithIndex(func) 类似映射分区,而且还提供func 来表示分区的索引的整数值,因此 func 必须是类型 (Int, Iterator) ⇒ Iterator 当类型T在RDD上运行时
    6 sample(withReplacement, fraction, seed) 采样数据的一小部分,有或没有更换,利用给定的随机数发生器的种子
    7 union(otherDataset) 返回一个新的数据集,其中包含源数据和参数元素的结合
    8 intersection(otherDataset) 返回包含在源数据和参数元素的新RDD交集
    9 distinct([numTasks]) 返回一个新的数据集包含源数据集的不同元素
    10 groupByKey([numTasks]) 当调用(K,V)数据集,返回(K, Iterable) 对数据集
    11 reduceByKey(func, [numTasks])
    12 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    13 sortByKey([ascending], [numTasks])
    14 join(otherDataset, [numTasks])
    15 cogroup(otherDataset, [numTasks])
    16 cartesian(otherDataset) 当上调用类型T和U的数据集,返回(T,U)对数据集(所有元素对)
    17 pipe(command, [envVars]) RDD通过shell命令每个分区,例如:一个Perl或bash脚本。RDD元素被写入到进程的标准输入和线路输出,标准输出形式返回一个字符串RDD
    18 coalesce(numPartitions) 减少RDD到numPartitions分区的数量。过滤大型数据集后,更高效地运行的操作
    19 repartition(numPartitions) 打乱RDD数据随机创造更多或更少的分区,并在它们之间平衡。这总是打乱的所有数据在网络上
    20 repartitionAndSortWithinPartitions(partitioner) 根据给定的分区重新分区RDD及在每个结果分区,排序键记录。这是调用重新分配排序在每个分区内,因为它可以推动分拣向下进入混洗机制效率更高。

    动作

    S.No 操作 & 含义
    1 reduce(func) 合计数据集的元素,使用函数 func (其中有两个参数和返回一行). 该函数应该是可交换和可结合,以便它可以正确地在并行计算。
    2 collect() 返回数据集的所有作为数组在驱动程序的元素。这是一个过滤器或其它操作之后返回数据的一个足够小的子集,通常是有用的
    3 count() 返回该数据集的元素数
    4 first() 返回的数据集的第一个元素(类似于使用(1))
    5 take(n) 返回与该数据集的前n个元素的阵列。
    6 takeSample (withReplacement,num, [seed]) 返回数组的数据集num个元素,有或没有更换随机抽样,预指定的随机数发生器的种子可选
    7 takeOrdered(n, [ordering]) 返回RDD使用或者按其自然顺序或自定义比较的前第n个元素
    8 saveAsTextFile(path) 写入数据集是一个文本文件中的元素(或一组文本文件),在给定的目录的本地文件系统,HDFS或任何其他的Hadoop支持的文件系统。Spark调用每个元素的 toString,将其转换为文件中的文本行
    9 saveAsSequenceFile(path) (Java and Scala) 写入数据集,为Hadoop SequenceFile元素在给定的路径写入在本地文件系统,HDFS或任何其他Hadoop支持的文件系统。 这是适用于实现Hadoop可写接口RDDS的键 - 值对。在Scala中,它也可以在属于隐式转换为可写(Spark包括转换为基本类型,如 Int, Double, String 等等)类型。
    10 saveAsObjectFile(path) (Java and Scala) 写入数据集的内容使用Java序列化为一个简单的格式,然后可以使用SparkContext.objectFile()加载。
    11 countByKey() 仅适用于RDDS的类型 (K, V). 返回(K, Int)对与每个键的次数的一个HashMap。
    12 foreach(func) 数据集的每个元素上运行函数func。这通常对于不良反应,例如更新累加器或与外部存储系统进行交互进行。

    示例程序

    //打开Spark-Shell
    $ spark-shell 
    //创建一个RDD
    scala> val inputfile = sc.textFile("input.txt")
    //执行字数转换
    scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
    //当前RDD
    scala> counts.toDebugString
    //缓存转换
    scala> counts.cache()
    //应用动作
    scala> counts.saveAsTextFile("output")
    
    

    Spark部署
    Spark应用程序使用spark-submit(shell命令)来部署在集群中的Spark应用程序
    示例:
    SparkWordCount.scala

    import org.apache.spark.SparkContext 
    import org.apache.spark.SparkContext._ 
    import org.apache.spark._  
    
    object SparkWordCount { 
      def main(args: Array[String]) { 
    
          val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
            
          /* local = master URL; Word Count = application name; */  
          /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
          /* Map = variables to work nodes */ 
          /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
          val input = sc.textFile("in.txt") 
          /* Transform the inputRDD into countRDD */ 
            
          valcount = input.flatMap(line ⇒ line.split(" ")) 
          .map(word ⇒ (word, 1)) 
          .reduceByKey(_ + _) 
          
          /* saveAsTextFile method is an action that effects on the RDD */  
          count.saveAsTextFile("outfile") 
          System.out.println("OK"); 
      } 
    }  
    

    步骤:
    1、下载Spark Ja
    下载spark-core_2.10-1.3.0.jar
    2、编译程序

    $ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala 
    

    3、创建JAR

    jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
    

    4、提交spark应用

    spark-submit --class SparkWordCount --master local wordcount.jar 
    
  • 相关阅读:
    ios xib或storyBoard的那些小方法
    ios pod库更新到1.0或1.0.1之正确修改podfile文件
    ios UILabel在storyBoard或xib中如何在每行文字不显示完就换行
    ios NSThred多线程简单使用
    Xcode升级插件失效,与添加插件不小心点击Skip Bundle解决办法
    ios app打ipa包
    极光推送碰到的问题
    ios 更新约束
    ios 缺少合规证明
    Path Sum II
  • 原文地址:https://www.cnblogs.com/kioluo/p/8824788.html
Copyright © 2011-2022 走看看