zoukankan      html  css  js  c++  java
  • Spark入门

    1. Spark集群安装

    1.1  机器部署

        准备两台以上Linux服务器,安装好JDK

    1.2  下载Spark安装包

        提前到官网下载相应版本并上传spark-安装包到Linux上

        解压安装包到指定位置

    tar -zxvf spark-2.1.0-bin-hadoop2.6.tgz -C /usr/local

    1.3  配置Spark

        进入到Spark安装目录

    cd /usr/local/spark-2.1.0-bin-hadoop2.6

        进入conf目录并重命名并修改spark-env.sh.template文件

    cd conf/
    mv spark-env.sh.template spark-env.sh
    vi spark-env.sh

        在该配置文件中添加如下配置

    export JAVA_HOME=/usr/java/jdk1.8.0_111
    #export SPARK_MASTER_IP=node1.edu360.cn
    #export SPARK_MASTER_PORT=7077

        保存退出

        重命名并修改slaves.template文件

    mv slaves.template slaves
    vi slaves

        在该文件中添加子节点所在的位置(Worker节点)

    node2.edu360.cn
    node3.edu360.cn
    node4.edu360.cn

        保存退出

        将配置好的Spark拷贝到其他节点上

    scp -r spark-2.1.0-bin-hadoop2.6/ node2.edu360.cn:/usr/local/
    scp -r spark-2.1.0-bin-hadoop2.6/ node3.edu360.cn:/usr/local/
    scp -r spark-2.1.0-bin-hadoop2.6/ node4.edu360.cn:/usr/local/

        Spark集群配置完毕,目前是1个Master,3个Work,在node1.edu360.cn上启动Spark集群

    /usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh

        启动后执行jps命令,主节点上有Master进程,其他子节点上有Work进行,登录Spark管理界面查看集群状态(主节点):http://node1.edu360.cn:8080/

        到此为止,Spark集群安装完毕,但是有一个很大的问题,那就是Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:

        Spark集群规划:node1,node2是Master;node3,node4,node5是Worker

        安装配置zk集群,并启动zk集群

        停止spark所有服务,修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_IP并添加如下配置

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk1,zk2,zk3 -Dspark.deploy.zookeeper.dir=/spark"

        1.在node1节点上修改slaves配置文件内容指定worker节点

        2.在node1上执行sbin/start-all.sh脚本,然后在node2上执行sbin/start-master.sh启动第二个Master

    2. 执行Spark程序

    2.1 执行第一个spark程序

    /usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://node1.edu360.cn:7077 --executor-memory 1G --total-executor-cores 2 /usr/local/spark-2.1.0-bin-hadoop2.6/lib/spark-examples-2.1.0-hadoop2.6.0.jar 100

        该算法是利用蒙特·卡罗算法求PI

    2.2 启动Spark Shell

        spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

    2.2.1 启动spark shell

    /usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-shell --master spark://node1.edu360.cn:7077 --executor-memory 2g --total-executor-cores 2

        参数说明:

    --master spark://node1.edu360.cn:7077 指定Master的地址
    --executor-memory 2g 指定每个worker可用内存为2G
    --total-executor-cores 2 指定整个集群使用的cup核数为2个

        注意:如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可.

    2.2.2 在spark shell中编写WordCount程序

        1.首先启动hdfs

        2.向hdfs上传一个文件到hdfs://node1.edu360.cn:9000/words.txt

        3.在spark shell中用scala语言编写spark程序

    sc.textFile("hdfs://node1.edu360.cn:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://node1.edu360.cn:9000/out")

        4.使用hdfs命令查看结果

    hdfs dfs -ls hdfs://node1.edu360.cn:9000/out/p*

        说明:

        sc是SparkContext对象,该对象时提交spark程序的入口

        textFile(hdfs://node1.edu360.cn:9000/words.txt)是hdfs中读取数据

        flatMap(_.split(" "))先map在压平

        map((_,1))将单词和1构成元组

        reduceByKey(_+_)按照key进行reduce,并将value累加

        saveAsTextFile("hdfs://node1.edu360.cn:9000/out")将结果写入到hdfs中

    1.3. 在IDEA中编写WordCount程序

        spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

        1.创建一个项目

        2.选择Maven项目,然后点击next

        3.填写maven的GAV,然后点击next

        4.填写项目名称,然后点击finish

        5.创建好maven项目后,点击Enable Auto-Import

        6.配置Maven的pom.xml

      1 <?xml version="1.0" encoding="UTF-8"?>
      2 <project xmlns="http://maven.apache.org/POM/4.0.0"
      3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      5     <modelVersion>4.0.0</modelVersion>
      6 
      7     <groupId>learn.spark</groupId>
      8     <artifactId>SparkDemo</artifactId>
      9     <version>1.0-SNAPSHOT</version>
     10 
     11     <properties>
     12         <maven.compiler.source>1.8</maven.compiler.source>
     13         <maven.compiler.target>1.8</maven.compiler.target>
     14         <scala.version>2.11.8</scala.version>
     15         <spark.version>2.2.0</spark.version>
     16         <hadoop.version>2.6.5</hadoop.version>
     17         <encoding>UTF-8</encoding>
     18     </properties>
     19 
     20     <dependencies>
     21         <!-- 导入scala的依赖 -->
     22         <dependency>
     23             <groupId>org.scala-lang</groupId>
     24             <artifactId>scala-library</artifactId>
     25             <version>${scala.version}</version>
     26         </dependency>
     27 
     28         <!-- 导入spark的依赖 -->
     29         <dependency>
     30             <groupId>org.apache.spark</groupId>
     31             <artifactId>spark-core_2.11</artifactId>
     32             <version>${spark.version}</version>
     33         </dependency>
     34 
     35         <!-- 指定hadoop-client API的版本 -->
     36         <dependency>
     37             <groupId>org.apache.hadoop</groupId>
     38             <artifactId>hadoop-client</artifactId>
     39             <version>${hadoop.version}</version>
     40         </dependency>
     41 
     42     </dependencies>
     43 
     44     <build>
     45         <pluginManagement>
     46             <plugins>
     47                 <!-- 编译scala的插件 -->
     48                 <plugin>
     49                     <groupId>net.alchim31.maven</groupId>
     50                     <artifactId>scala-maven-plugin</artifactId>
     51                     <version>3.2.2</version>
     52                 </plugin>
     53                 <!-- 编译java的插件 -->
     54                 <plugin>
     55                     <groupId>org.apache.maven.plugins</groupId>
     56                     <artifactId>maven-compiler-plugin</artifactId>
     57                     <version>3.5.1</version>
     58                 </plugin>
     59             </plugins>
     60         </pluginManagement>
     61         <plugins>
     62             <plugin>
     63                 <groupId>net.alchim31.maven</groupId>
     64                 <artifactId>scala-maven-plugin</artifactId>
     65                 <executions>
     66                     <execution>
     67                         <id>scala-compile-first</id>
     68                         <phase>process-resources</phase>
     69                         <goals>
     70                             <goal>add-source</goal>
     71                             <goal>compile</goal>
     72                         </goals>
     73                     </execution>
     74                     <execution>
     75                         <id>scala-test-compile</id>
     76                         <phase>process-test-resources</phase>
     77                         <goals>
     78                             <goal>testCompile</goal>
     79                         </goals>
     80                     </execution>
     81                 </executions>
     82             </plugin>
     83 
     84             <plugin>
     85                 <groupId>org.apache.maven.plugins</groupId>
     86                 <artifactId>maven-compiler-plugin</artifactId>
     87                 <executions>
     88                     <execution>
     89                         <phase>compile</phase>
     90                         <goals>
     91                             <goal>compile</goal>
     92                         </goals>
     93                     </execution>
     94                 </executions>
     95             </plugin>
     96 
     97 
     98             <!-- 打jar插件 -->
     99             <plugin>
    100                 <groupId>org.apache.maven.plugins</groupId>
    101                 <artifactId>maven-shade-plugin</artifactId>
    102                 <version>2.4.3</version>
    103                 <executions>
    104                     <execution>
    105                         <phase>package</phase>
    106                         <goals>
    107                             <goal>shade</goal>
    108                         </goals>
    109                         <configuration>
    110                             <filters>
    111                                 <filter>
    112                                     <artifact>*:*</artifact>
    113                                     <excludes>
    114                                         <exclude>META-INF/*.SF</exclude>
    115                                         <exclude>META-INF/*.DSA</exclude>
    116                                         <exclude>META-INF/*.RSA</exclude>
    117                                     </excludes>
    118                                 </filter>
    119                             </filters>
    120                         </configuration>
    121                     </execution>
    122                 </executions>
    123             </plugin>
    124         </plugins>
    125     </build>
    126 
    127 </project>
    View Code

        7.新建一个scala class,类型为Object

        8.编写spark程序

     1 package spark.scala
     2 
     3 import org.apache.spark.rdd.RDD
     4 import org.apache.spark.{SparkConf, SparkContext}
     5 
     6 object ScalaWordCount {
     7 
     8     def main(args: Array[String]): Unit = {
     9         // 创建spark配置,设置应用程序名字
    10         //val conf = new SparkConf().setAppName("ScalaWordCount")
    11         val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[*]")
    12         // 创建spark执行入口
    13         val sc = new SparkContext(conf)
    14         // 指定以后从哪儿读取数据创建RDD(弹性分布式数据集)
    15         // sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).saveAsTextFile(args(1))
    16 
    17         val line: RDD[String] = sc.textFile(args(0))
    18         // 切分压平
    19         val words: RDD[String] = line.flatMap(_.split(" "))
    20         // 将单词和1组合
    21         val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    22         // 按照key进行聚合
    23         val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    24         // 排序
    25         val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
    26         // 将结果保存到HDFS中
    27         sorted.saveAsTextFile(args(1))
    28         // 释放资源
    29         sc.stop()
    30     }
    31 }
    View Code

        9.使用Maven打包:首先修改pom.xml中的main class

        点击idea右侧的Maven Project选项;点击Lifecycle,选择clean和package,然后点击Run Maven Build

        10.选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上

        11.首先启动hdfs和Spark集群

        启动hdfs

    /usr/local/hadoop-2.6.5/sbin/start-dfs.sh

        启动spark

    /usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh

        12.使用spark-submit命令提交Spark应用(注意参数的顺序)

    /usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --class cn.itcast.spark.WordCount --master spark://node1.edu360.cn:7077 --executor-memory 2G --total-executor-cores 4 /root/spark-mvn-1.0-SNAPSHOT.jar hdfs://node1.edu360.cn:9000/words.txt 
     hdfs://node1.edu360.cn:9000/out

        查看程序执行结果

    hdfs dfs -cat hdfs://node1.edu360.cn:9000/out/part-00000
  • 相关阅读:
    solr 5.3.1安装配置
    STS 设置代码注释模板
    visual studio 设置代码注释模板
    JAXBContext处理CDATA
    用STS和Maven的方式创建一个JavaWeb项目
    .NET跨平台实践:用C#开发Linux守护进程-Daemon
    不装mono,你的.NET程序照样可以在Linux上运行!
    Tomcat关闭日志输出
    使用git pull文件时和本地文件冲突怎么办?
    Linux命令-进程后台执行:nohup(就是不挂起的意思)
  • 原文地址:https://www.cnblogs.com/zhangchao162/p/10105898.html
Copyright © 2011-2022 走看看