zoukankan      html  css  js  c++  java
  • Spark2.0学习(二)--------RDD详解

    添加针对scala文件的编译插件

    ------------------------------

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.it18zhang</groupId>
    <artifactId>SparkDemo1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    </configuration>
    </plugin>
    <plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.2.2</version>
    <configuration>
    <recompileMode>incremental</recompileMode>
    </configuration>
    <executions>
    <execution>
    <goals>
    <goal>compile</goal>
    <goal>testCompile</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>

    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>
    </dependencies>
    </project>

    RDD:----------------
    是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点
    进行计算。可以包含任何java,scala,python和自定义类型。

    RDD是只读的记录分区集合。RDD具有容错机制。

    创建RDD方式,一、并行化一个现有集合。

    hadoop 花费90%时间用户rw。、

    内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间。

    内部包含5个主要属性
    -----------------------
    1.分区列表
    2.针对每个split的计算函数。
    3.对其他rdd的依赖列表
    4.可选,如果是KeyValueRDD的话,可以带分区类。
    5.可选,首选块位置列表(hdfs block location);

    //默认并发度
    local.backend.defaultParallelism() = scheduler.conf.getInt("spark.default.parallelism", totalCores)
    taskScheduler.defaultParallelism = backend.defaultParallelism()
    sc.defaultParallelism =...; taskScheduler.defaultParallelism
    defaultMinPartitions = math.min(defaultParallelism, 2)
    sc.textFile(path,defaultMinPartitions) //1,2

    RDD变换

    ------------------
    返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。

    map() //对每个元素进行变换,应用变换函数
    //(T)=>V


    filter() //过滤器,(T)=>Boolean
    flatMap() //压扁,T => TraversableOnce[U]

    mapPartitions() //对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。
    //Iterator<T> => Iterator<U>

    mapPartitionsWithIndex(func) //同上,(Int, Iterator<T>) => Iterator<U>

    sample(withReplacement, fraction, seed) //采样返回采样的RDD子集。
    //withReplacement 元素是否可以多次采样.
    //fraction : 期望采样数量.[0,1]

    union() //类似于mysql union操作。
    //select * from persons where id < 10
    //union select * from id persons where id > 29 ;

    intersection //交集,提取两个rdd中都含有的元素。
    distinct([numTasks])) //去重,去除重复的元素。

    groupByKey() //(K,V) => (K,Iterable<V>)

    reduceByKey(*) //按key聚合。

    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    //按照key进行聚合
    key:String U:Int = 0

    sortByKey //排序

    join(otherDataset, [numTasks]) //连接,(K,V).join(K,W) =>(K,(V,W))

    cogroup //协分组
    //(K,V).cogroup(K,W) =>(K,(Iterable<V>,Iterable<!-- <W> -->))
    cartesian(otherDataset) //笛卡尔积,RR[T] RDD[U] => RDD[(T,U)]

    pipe //将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD
    coalesce(numPartitions) //减少分区
    repartition //可增可减
    repartitionAndSortWithinPartitions(partitioner)
    //再分区并在分区内进行排序


    RDD Action
    ------------------
    collect() //收集rdd元素形成数组.
    count() //统计rdd元素的个数
    reduce() //聚合,返回一个值。
    first //取出第一个元素take(1)
    take //
    takeSample (withReplacement,num, [seed])
    takeOrdered(n, [ordering])

    saveAsTextFile(path) //保存到文件
    saveAsSequenceFile(path) //保存成序列文件

    saveAsObjectFile(path) (Java and Scala)

    countByKey() //按照key,统计每个key下value的个数.

    spark集成hadoop ha
    -------------------------
    1.复制core-site.xml + hdfs-site.xml到spark/conf目录下
    2.分发文件到spark所有work节点
    3.启动spark集群
    4.启动spark-shell,连接spark集群上
    $>spark-shell --master spark://s201:7077
    $scala>sc.textFile("hdfs://mycluster/user/centos/test.txt").collect();

  • 相关阅读:
    Python——str(字符串)内部功能介绍
    Python网络编程——设定并获取默认的套接字超时时间
    Python网络编程——主机字节序和网络字节序之间的相互转换
    Python网络编程——通过指定的端口和协议找到服务名
    python网络编程——将IPv4地址转换成不同的格式
    Python网络编程——获取远程设备的IP地址
    Python网络编程——设备名和IPv4地址
    Python+Flask+MysqL的web建设技术过程
    管理信息系统 第三部分 作业
    模型分离(选做)
  • 原文地址:https://www.cnblogs.com/tree1123/p/10103505.html
Copyright © 2011-2022 走看看