zoukankan      html  css  js  c++  java
  • Spark 学习(一) 初识

    一,Spark概述

      1.1 什么是Spark

      1.2 Spark的产生背景

      1.3 Spark的特点

    二,Spark集群安装

      2.1 集群部署

      2.2 Spark下载

      2.3 Spark配置

      2.4 启动

    三,执行Spark程序

      3.1 执行第一个Spark程序

      3.2 启动Spark shell

      3.3 在IDEA中进行Spark程序编写WordCount程序

     

    正文

    一,Spark概述

      1.1 什么是Spark 

      Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。

      1.2 Spark的产生背景

      在分布式计算中的中间结果输出:基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的,考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果。下图是Hadoop和Spark的对比:

      

      Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

      1.3 Spark的特点

      :与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。

      易用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

      通用:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

        兼容性:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工。 

    二,Spark集群安装

      2.1 集群部署

      注意:根据Sparker安装JDK,一般情况是安装JDK1.8以上,我这里安装的是JDK1.8

    // 准备3台机器,如下角色分配
    
    hd1---->Master Worker
    hd2---->Worker
    hd3---->Worker

      2.2 Spark下载

      

      下面直接提供一个下载地址,点击下载即可。

      点击进行下载下载spark

      下载完毕,上传到服务器,解压到指定目录即可,如下:

      

      2.3 Spark配置

      下载完毕后启动前,需要进行配置文件的配置。

      有两个文件需要我们修改,如下所示:

      

      将这两个文件拷贝一份,名称分别分别为slaves和spark-env.sh。如下图:

      

      配置spark-env.sh:

      添加如下配置:

    # jdk安装目录
    export JAVA_HOME=/usr/local/hadoop/jdk/jdk1.8.0_201
    # master运行host
    export SPARK_MASTER_IP=hd1
    # master运行端口
    export SPARK_MASTER_PORT=7077

      如下实例:

      

      配置slaves:添加那些主机运行worker

      

      然后将整个安装目录,复制到其他主机。

    scp -r spark-2.3.3 hd2:$PWD
    scp -r spark-2.3.3 hd3:$PWD

      2.4 启动

      进入spark的sbin目录,运气start-all.sh启动spark,如下:

      

      hd1上有如下进程:

      

      hd2,hd3上有如下进程:

      

      上述进程就启动成功。当然可以通过web页面访问,访问hd1的8080端口即可,如下所示:

      

    三,执行Spark程序

      3.1 执行第一个Spark程序

      spark内有一个内部的测试程序,可以用来测试,如下:

      测试命令:

    ./spark-submit   # spark 的提交命令,在spark安装的bin目录下面
    --class org.apache.spark.examples.SparkPi # 启动类
    --master spark://hd1:7077  # 指定spark
    /usr/local/hadoop/spark/spark-2.3.3/examples/jars/spark-examples_2.11-2.3.3.jar 1000  # 启动jar包和执行次数

      执行过程中在Worker节点会多一个进程:如下所示

      

      而提及任务的还会多一个SparkSubmit进程:

      

      在Web中查看运行状态:

      

      3.2 启动Spark shell

      spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

      启动命令:

    ./spark-shell --master spark://hd1:7077 --executor-memory 1g --total-executor-cores 2
    
    参数说明:
    --master spark://node1.edu360.cn:7077 指定Master的地址
    --executor-memory 2g 指定每个worker可用内存为2G
    --total-executor-cores 2 指定整个集群使用的cup核数为2个

      启动后进入如下界面:

      

      注意:

      如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

      Spark shell中编写WordCount程序:

      Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可

      使用前,先启动hdfs,进行文件读取。

      命令:

    sc.textFile("hdfs://hd1:9000/wordcount/input/").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2, false).collect

      如下实例:

      

      可以将输出的文件保存到hdfs:

      命令:

     sc.textFile("hdfs://hd1:9000/wordcount/input/").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hd1:9000/wordcount/sparkOut")

      命令说明:

    说明:
    sc是SparkContext对象,该对象时提交spark程序的入口
    textFile(hdfs://hd1:9000/wordcount/input/)是hdfs中读取数据
    flatMap(_.split(" "))先map在压平
    map((_,1))将单词和1构成元组
    reduceByKey(_+_)按照key进行reduce,并将value累加
    saveAsTextFile("hdfs://hd1:9000/wordcount/sparkOut")将结果写入到hdfs中

      3.3 在IDEA中进行Spark程序编写WordCount程序

       maven工程依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.edu360.spark</groupId>
        <artifactId>SparkTest</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.2.0</spark.version>
            <hadoop.version>2.7.3</hadoop.version>
            <encoding>UTF-8</encoding>
        </properties>
    
        <dependencies>
            <!-- 导入scala的依赖 -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <!-- 导入spark的依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <!-- 指定hadoop-client API的版本 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <pluginManagement>
                <plugins>
                    <!-- 编译scala的插件 -->
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                    </plugin>
                    <!-- 编译java的插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.5.1</version>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
    
                <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

      用scala编写:

    package cn.edu360.spark
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object ScalaWordCount {
    
      def main(args: Array[String]): Unit = {
        //创建spark配置,设置应用程序名字
        val conf = new SparkConf().setAppName("ScalaWordCount")
        // 下面是本地调试模式,无需打jar包
        // val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[4]")
        //创建spark执行的入口
        val sc = new SparkContext(conf)
        //指定以后从哪里读取数据创建RDD(弹性分布式数据集)
        //sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).saveAsTextFile(args(1))
    
        val lines: RDD[String] = sc.textFile(args(0))
        //切分压平
        val words: RDD[String] = lines.flatMap(_.split(" "))
        //将单词和一组合
        val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
        //按key进行聚合
        val reduced:RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
        //排序
        val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
        //将结果保存到HDFS中
        sorted.saveAsTextFile(args(1))
        //释放资源
        sc.stop()
      }
    }

      java语言实现方式一,不用lamada方式:

    package cn.edu360.spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class JavaWordCount {
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf().setAppName("JavaWordCount");
            //创建sparkContext
            JavaSparkContext jsc = new JavaSparkContext(conf);
            //指定以后从哪里读取数据
            JavaRDD<String> lines = jsc.textFile(args[0]);
            //切分压平
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" ")).iterator();
                }
            });
            //将单词和一组合在一起
            JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2<>(word, 1);
                }
            });
            //聚合
            JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
            //调换顺序
            JavaPairRDD<Integer, String> swaped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                    return tp.swap();
                }
            });
    
            //排序
            JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
    
            //调整顺序
            JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                    return tp.swap();
                }
            });
    
            //将数据保存到hdfs
            result.saveAsTextFile(args[1]);
    
            //释放资源
            jsc.stop();
        }
    }

      Java用lamada方式:

    package cn.edu360.spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    import java.util.Arrays;
    public class JavaLambdaWordCount {
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf().setAppName("JavaWordCount");
            //创建sparkContext
            JavaSparkContext jsc = new JavaSparkContext(conf);
            //指定以后从哪里读取数据
            JavaRDD<String> lines = jsc.textFile(args[0]);
            //切分压平
            JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
            //将单词和一组合
            JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> new Tuple2<>(w, 1));
            //聚合
            JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((m, n) -> m + n);
            //调整顺序
            JavaPairRDD<Integer, String> swaped = reduced.mapToPair(tp -> tp.swap());
            //排序
            JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
            //调整顺序
            JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap());
            //将结果保存到hdfs
            result.saveAsTextFile(args[1]);
            //释放资源
            jsc.stop();
            
        }
    }

       

  • 相关阅读:
    web通信浅析(上B/S交互)转载
    tomcat内部运行原理浅析转载
    oracle集合运算
    Oracle 游标使用全解
    oracle 一些基本概念
    1.搭建项目环境之TestDirector 8.0
    修改Win7远程桌面端口
    从第二份工作开始
    2.搭建项目环境之源代码管理SVN
    How to Get IIS Web Sites Information Programmatically
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10969520.html
Copyright © 2011-2022 走看看