zoukankan      html  css  js  c++  java
  • 提交任务到Spark

    1.场景

      在搭建好Hadoop+Spark环境后,现准备在此环境上提交简单的任务到Spark进行计算并输出结果。搭建过程:http://www.cnblogs.com/zengxiaoliang/p/6478859.html 

      本人比较熟悉Java语言,现以Java的WordCount为例讲解这整个过程,要实现计算出给定文本中每个单词出现的次数。

    2.环境测试

      在讲解例子之前,我想先测试一下之前搭建好的环境。

      2.1测试Hadoop环境

      首先创建一个文件wordcount.txt 内容如下:

    Hello hadoop
    hello spark
    hello bigdata
    yellow banana
    red apple

      然后执行如下命令:

      hadoop fs -mkdir -p /Hadoop/Input(在HDFS创建目录)

      hadoop fs -put wordcount.txt /Hadoop/Input(将wordcount.txt文件上传到HDFS)

      hadoop fs -ls /Hadoop/Input (查看上传的文件)

      hadoop fs -text /Hadoop/Input/wordcount.txt (查看文件内容)

       2.2Spark环境测试

      我使用spark-shell,做一个简单的WordCount的测试。我就用上面Hadoop测试上传到HDFS的文件wordcount.txt。

      首先启动spark-shell命令:

      spark-shell

      

      然后直接输入scala语句:

      val file=sc.textFile("hdfs://Master:9000/Hadoop/Input/wordcount.txt")

      val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

      rdd.collect()

      rdd.foreach(println)

      

      退出使用如下命令:

      :quit

      这样环境测试就结束了。

     3.Java实现单词计数

    package com.example.spark;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.regex.Pattern;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    public final class WordCount {
        private static final Pattern SPACE = Pattern.compile(" ");
    
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf().setAppName("kevin's first spark app");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile(args[0]).cache();
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<String> call(String s) {
                    return Arrays.asList(SPACE.split(s)).iterator();
                }
            });
    
            JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
    
            List<Tuple2<String, Integer>> output = counts.collect();
            for (Tuple2<?, ?> tuple : output) {
                System.out.println(tuple._1() + ": " + tuple._2());
            }
    
            sc.close();
        }
    }

    4.任务提交实现

      将上面Java实现的单词计数打成jar包spark-example-0.0.1-SNAPSHOT.jar,并且将jar包上传到Master节点,我是将jar包上传到/opt目录下,本文将以两种方式提交任务到spark,第一种是以spark-submit命令的方式提交任务,第二种是以java web的方式提交任务。

      4.1以spark-submit命令的方式提交任务

      spark-submit --master spark://114.55.246.88:7077 --class com.example.spark.WordCount /opt/spark-example-0.0.1-SNAPSHOT.jar hdfs://Master:9000/Hadoop/Input/wordcount.txt

      4.2以java web的方式提交任务

      我是用spring boot搭建的java web框架,实现代码如下:

      1)新建maven项目spark-submit

      2)pom.xml文件内容,这里要注意spark的依赖jar包要与scala的版本相对应,如spark-core_2.11,这后面2.11就是你安装的scala的版本

    <?xml version="1.0"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.4.1.RELEASE</version>
        </parent>
        <artifactId>spark-submit</artifactId>
        <description>spark-submit</description>
        <properties>
            <start-class>com.example.spark.SparkSubmitApplication</start-class>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
            <commons.version>3.4</commons.version>
            <org.apache.spark-version>2.1.0</org.apache.spark-version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${commons.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.tomcat.embed</groupId>
                <artifactId>tomcat-embed-jasper</artifactId>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.jayway.jsonpath</groupId>
                <artifactId>json-path</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <exclusions>
                    <exclusion>
                        <artifactId>spring-boot-starter-tomcat</artifactId>
                        <groupId>org.springframework.boot</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jetty</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.eclipse.jetty.websocket</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jetty</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>jstl</artifactId>
            </dependency>
            <dependency>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>apache-jsp</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-solr</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>jstl</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.11</artifactId>
                <version>1.6.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-graphx_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.6.5</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.6.5</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>2.6.5</version>
            </dependency>
    
        </dependencies>
        <packaging>war</packaging>
    
        <repositories>
            <repository>
                <id>spring-snapshots</id>
                <name>Spring Snapshots</name>
                <url>https://repo.spring.io/snapshot</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
            <repository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
            <repository>
                <id>maven2</id>
                <url>http://repo1.maven.org/maven2/</url>
            </repository>
        </repositories>
        <pluginRepositories>
            <pluginRepository>
                <id>spring-snapshots</id>
                <name>Spring Snapshots</name>
                <url>https://repo.spring.io/snapshot</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </pluginRepository>
            <pluginRepository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </pluginRepository>
        </pluginRepositories>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-war-plugin</artifactId>
                    <configuration>
                        <warSourceDirectory>src/main/webapp</warSourceDirectory>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.mortbay.jetty</groupId>
                    <artifactId>jetty-maven-plugin</artifactId>
                    <configuration>
                        <systemProperties>
                            <systemProperty>
                                <name>spring.profiles.active</name>
                                <value>development</value>
                            </systemProperty>
                            <systemProperty>
                                <name>org.eclipse.jetty.server.Request.maxFormContentSize</name>
                                <!-- -1代表不作限制 -->
                                <value>600000</value>
                            </systemProperty>
                        </systemProperties>
                        <useTestClasspath>true</useTestClasspath>
                        <webAppConfig>
                            <contextPath>/</contextPath>
                        </webAppConfig>
                        <connectors>
                            <connector implementation="org.eclipse.jetty.server.nio.SelectChannelConnector">
                                <port>7080</port>
                            </connector>
                        </connectors>
                    </configuration>
                </plugin>
            </plugins>
    
        </build>
    </project>

      3)SubmitJobToSpark.java

    package com.example.spark;
    
    import org.apache.spark.deploy.SparkSubmit;
    
    /**
     * @author kevin
     *
     */
    public class SubmitJobToSpark {
    
        public static void submitJob() {
            String[] args = new String[] { "--master", "spark://114.55.246.88:7077", "--name", "test java submit job to spark", "--class", "com.example.spark.WordCount", "/opt/spark-example-0.0.1-SNAPSHOT.jar", "hdfs://Master:9000/Hadoop/Input/wordcount.txt" };
            SparkSubmit.main(args);
        }
    }

      4)SparkController.java

    package com.example.spark.web.controller;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    import com.example.spark.SubmitJobToSpark;
    
    @Controller
    @RequestMapping("spark")
    public class SparkController {
        private Logger logger = LoggerFactory.getLogger(SparkController.class);
    
        @RequestMapping(value = "sparkSubmit", method = { RequestMethod.GET, RequestMethod.POST })
        @ResponseBody
        public String sparkSubmit(HttpServletRequest request, HttpServletResponse response) {
            logger.info("start submit spark tast...");
            SubmitJobToSpark.submitJob();
            return "hello";
        }
    
    }

      5)将项目spark-submit打成war包部署到Master节点tomcat上,访问如下请求:

      http://114.55.246.88:9090/spark-submit/spark/sparkSubmit

      在tomcat的log中能看到计算的结果。

  • 相关阅读:
    最接近的三数之和(给定一个包括 n 个整数的数组 nums 和 一个目标值 target。找出 nums 中的三个整数, 使得它们的和与 target 最接近。返回这三个数的和)
    在排序数组中查找元素的第一个和最后一个位置(给定一个按照升序排列的整数数组 nums,和一个目标值 target。找出给定目标值在数组中的开始位置和结束位置。)
    查找常用字符(给定仅有小写字母组成的字符串数组 A,返回列表中的每个字符串中都显示的全部字符(包括重复字符)组成的列表。例如,如果一个字符在每个字符串中出现 3 次,但不是 4 次,则需要在最终答案中包含该字符 3 次。)
    三数之和等于0问题优化
    线性表之数组实现
    LeetCode竞赛题:笨阶乘(我们设计了一个笨阶乘 clumsy:在整数的递减序列中,我们以一个固定顺序的操作符序列来依次替换原有的乘法操作符:乘法(*),除法(/),加法(+)和减法(-)。)
    LeetCode竞赛题:K 次取反后最大化的数组和(给定一个整数数组 A,我们只能用以下方法修改该数组:我们选择某个个索引 i 并将 A[i] 替换为 -A[i],然后总共重复这个过程 K 次。)
    最短路径(给定一个包含非负整数的 m x n 网格,请找出一条从左上角到右下角的路径,使得路径上的数字总和为最小。 说明:每次只能向下或者向右移动一步。)
    不同路径II(一个机器人位于一个 m x n 网格的左上角 (起始点在下图中标记为“Start” )。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角(在下图中标记为“Finish”)。 现在考虑网格中有障碍物。那么从左上角到右下角将会有多少条不同的路径?网格中的障碍物和空位置分别用 1 和 0 来表示。)
    不同路径(一个机器人位于一个 m x n 网格的左上角 (起始点在下图中标记为“Start” )。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角(在下图中标记为“Finish”)。 问总共有多少条不同的路径?)
  • 原文地址:https://www.cnblogs.com/zengxiaoliang/p/6508330.html
Copyright © 2011-2022 走看看