zoukankan      html  css  js  c++  java
  • spark(2)

    1.spark模块

    --------------------------------------

      (1)Spark Core      //核心库

      (2)Spark SQL      //核心库

      (3)Spark Streaming    //准实时计算

      (4)Spark MLlib      //机器学习库

      (5)Spark graph      //图计算

    2.Spark集群的运行

    ------------------------------------------

      1.local          //本地模式

      2.standalone    //独立模式

      3.yarn         //yarn模式

      4.mesos      //mesqs

    3.start-all.sh    //spark集群的启动命令

    ----------------------------------------------

      start-master.sh   //RPC端口  7077

      start-slaves.sh  spark://s201:7077

    4.webui端口

    -------------------------------------------------

      http://s201:8080

      本地模式下:4040

    5.SparkContext:

    ---------------------------------------------------------

      到spark集群的连接。主要入口点都从这个地方来进

    SparkConf conf = new SparkConf();
    conf.setAppName("WordCountJava");
    conf.setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //RDD===>Spark的核心类 R:Resilient D:distributed dataset弹性分布式数据集
    JavaRDD<String> rdd1 = sc.textFile("d:/scala//hello.txt");
    //压扁,按空格进行切割,对这一行进行切割
    val rdd2 = rdd1.flatMap(line=>line.split(" "));
    val rdd3 = rdd2.map(word=>(word,1));
    val rdd4 = rdd3.reduceByKey(_ + _);
    val list = rdd4.collect();
    list.foreach(e=>print(e));    //通过高阶函数来进行循环

    spark

    ----------------------------------------

      基于hadoop的mr,对hadoop模型扩展,高效实用MR。包括交互式查询和流计算,内存型集群计算,提高app处理速度

    spark特点:

    ------------------------------------

      (1)速度,在内存中存储中间结果

      (2)支持多种语言

      (3)内置了80多种高级算子

      (4)高级分析:MR,SQL Streamming / mllib /graph

    spark模块

    -----------------------------------------  

      (1)core    //通用执行引擎,提供内存计算和对外部数据集的引用

      (2)SQL    //构建核心core模块之上,引入新的抽象SchemaRDD,提供了结构化支持和半结构化支持

      (3)Streaming  //小批量流计算。RDD弹性分布式数据集

      (4)MLlib    //机器学习库

    RDD

    -------------------------------------------------

       是spark的基本数据结构,是不可变数据集。在RDD中的每个数据集都被分成逻辑分区,分区之后就可以在集群的不同节点上进行计算,每个分区可以在集群节点上进行计算。可以包含任何java类型、scala类型、python类型以及自定义类型。RDD是只读的分区记录。RDD具有容错机制。

       创建RDD的方式:(1)并行化一个现有的集合。hadoop花费90%的时间用于读写操作。

                 内存处理计算,在job间进行数据共享。内存的IO速度高于网络和disk的10倍到100倍之间。

                  spark使用分布式内存来存储中间结果,然后将这些结果存储在磁盘上

                  RDD内部包含5个主要的属性:

                  -----------------------------------------------------

                    (1)分区列表

                    (2)针对每个切片的计算函数

                    (3)对其他rdd的依赖列表

                    (4)可选,如果是KeyValueRDD的话还可以带一个分区类

                    (5)可选,首选块位置列表(hdfs block location)

        

    RDD变换
    ------------------
    返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。

    map() //对每个元素进行变换,应用变换函数
    //(T)=>V


    filter() //过滤器,(T)=>Boolean
    flatMap() //压扁,T => TraversableOnce[U]

    mapPartitions() //对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。
    //Iterator<T> => Iterator<U>

    mapPartitionsWithIndex(func) //同上,(Int, Iterator<T>) => Iterator<U>

    sample(withReplacement, fraction, seed) //采样返回采样的RDD子集。
    //withReplacement 元素是否可以多次采样.
    //fraction : 期望采样数量.[0,1]

    union() //类似于mysql union操作。
    //select * from persons where id < 10
    //union select * from id persons where id > 29 ;

    intersection //交集,提取两个rdd中都含有的元素。
    distinct([numTasks])) //去重,去除重复的元素。

    groupByKey() //(K,V) => (K,Iterable<V>)

    reduceByKey(*) //按key聚合。

    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    //按照key进行聚合
    key:String U:Int = 0

    sortByKey //排序

    join(otherDataset, [numTasks]) //连接,(K,V).join(K,W) =>(K,(V,W))

    cogroup //协分组
    //(K,V).cogroup(K,W) =>(K,(Iterable<V>,Iterable<!-- <W> -->))
    cartesian(otherDataset) //笛卡尔积,RR[T] RDD[U] => RDD[(T,U)]

    pipe //将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD
    coalesce(numPartitions) //减少分区
    repartition //可增可减
    repartitionAndSortWithinPartitions(partitioner)
    //再分区并在分区内进行排序


    RDD Action
    ------------------
    collect() //收集rdd元素形成数组.
    count() //统计rdd元素的个数
    reduce() //聚合,返回一个值。
    first //取出第一个元素take(1)
    take //
    takeSample (withReplacement,num, [seed])
    takeOrdered(n, [ordering])

    saveAsTextFile(path) //保存到文件
    saveAsSequenceFile(path) //保存成序列文件

    saveAsObjectFile(path) (Java and Scala)

    countByKey() //按照key,统计每个key下value的个数.

    spark集成hadoop ha
    -------------------------
    1.复制core-site.xml + hdfs-site.xml到spark/conf目录下
    2.分发文件到spark所有work节点
    3.启动spark集群
    4.启动spark-shell,连接spark集群上
    $>spark-shell --master spark://s201:7077
    $scala>sc.textFile("hdfs://mycluster/user/centos/test.txt").collect();

  • 相关阅读:
    python-Lists
    python报错ValueError: invalid x escape
    ssm框架自动生成实体类、Dao、*Map.xml
    echarts3地图实现点击某省市区域跳转到相应页面
    echart3绘制地图
    sts maven项目单元测试报错initializationerror not found
    jQuery datatable后台分页实现
    解决bootstrapValidator校验冲突
    JAVA基础复习.DAY1
    【日记】1.18
  • 原文地址:https://www.cnblogs.com/bigdata-stone/p/9705397.html
Copyright © 2011-2022 走看看