zoukankan      html  css  js  c++  java
  • 提交任务到spark(以wordcount为例)

    1、首先需要搭建好hadoop+spark环境,并保证服务正常。本文以wordcount为例。

    2、创建源文件,即输入源。hello.txt文件,内容如下:

    tom jerry
    henry jim
    suse lusy

    注:以空格为分隔符

    3、然后执行如下命令:

      hadoop fs -mkdir -p /Hadoop/Input(在HDFS创建目录)

      hadoop fs -put hello.txt /Hadoop/Input(将hello.txt文件上传到HDFS)

      hadoop fs -ls /Hadoop/Input (查看上传的文件)

      hadoop fs -text /Hadoop/Input/hello.txt (查看文件内容)

    4、用spark-shell先测试一下wordcount任务。

    (1)启动spark-shell,当然需要在spark的bin目录下执行,但是这里我配置了环境变量。

    (2)然后直接输入scala语句:

      val file=sc.textFile("hdfs://hacluster/Hadoop/Input/hello.txt")

      val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

      rdd.collect()

      rdd.foreach(println)

    ok,测试通过。

    5、Scala实现单词计数

     1 package com.example.spark
     2 
     3 /**
     4  * User: hadoop
     5  * Date: 2017/8/17 0010
     6  * Time: 10:20
     7  */
     8 import org.apache.spark.SparkConf
     9 import org.apache.spark.SparkContext
    10 import org.apache.spark.SparkContext._
    11 
    12 /**
    13  * 统计字符出现次数
    14  */
    15 object ScalaWordCount {
    16   def main(args: Array[String]) {
    17     if (args.length < 1) {
    18       System.err.println("Usage: <file>")
    19       System.exit(1)
    20     }
    21 
    22     val conf = new SparkConf()
    23     val sc = new SparkContext(conf)
    24     val line = sc.textFile(args(0))
    25 
    26     line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
    27 
    28     sc.stop()
    29   }
    30 }

    6、用java实现wordcount

    package com.example.spark;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.regex.Pattern;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    public final class WordCount {
        private static final Pattern SPACE = Pattern.compile(" ");
    
        public static void main(String[] args) throws Exception {
            if (args.length < 1) {
                System.err.println("Usage: JavaWordCount <file>");
                System.exit(1);
            }
            SparkConf conf = new SparkConf().setAppName("JavaWordCount");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile(args[0],1);
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String s) {
                    return Arrays.asList(SPACE.split(s));
                }
            });
    
            JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
    
            List<Tuple2<String, Integer>> output = counts.collect();
            for (Tuple2<?, ?> tuple : output) {
                System.out.println(tuple._1() + ": " + tuple._2());
            }
    
            sc.stop();
        }
    }

    7、IDEA打包。

    (1)File ---> Project Structure 

     点击ok,配置完成后,在菜单栏中选择Build->Build Artifacts...,然后使用Build等命令打包。打包完成后会在状态栏中显示“Compilation completed successfully...”的信息,去jar包输出路径下查看jar包,如下所示。

    将这个wordcount.jar上传到集群的节点上,scp wordcount.jar root@10.57.22.244:/opt/   输入虚拟机root密码。

    8、运行jar包。

    本文以spark on yarn模式运行jar包。

    执行命令运行javawordcount:spark-submit --master yarn-client --class com.example.spark.WordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt

    执行命令运行scalawordcount:spark-submit --master yarn-client --class com.example.spark.ScalaWordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt

    本文以java的wordcount为演示对象,如下图:

    以上是直接以spark-submit方式提交任务,下面介绍一种以java web的方式提交。

    9、以Java Web的方式提交任务到spark。

    用spring boot搭建java web框架,实现代码如下:

      1)新建maven项目spark-submit

      2)pom.xml文件内容,这里要注意spark的依赖jar包要与scala的版本相对应,如spark-core_2.11,这后面2.11就是你安装的scala的版本

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.4.1.RELEASE</version>
        </parent>
        <groupId>wordcount</groupId>
        <artifactId>spark-submit</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <start-class>com.example.spark.SparkSubmitApplication</start-class>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
            <commons.version>3.4</commons.version>
            <org.apache.spark-version>2.1.0</org.apache.spark-version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${commons.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.tomcat.embed</groupId>
                <artifactId>tomcat-embed-jasper</artifactId>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.jayway.jsonpath</groupId>
                <artifactId>json-path</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <exclusions>
                    <exclusion>
                        <artifactId>spring-boot-starter-tomcat</artifactId>
                        <groupId>org.springframework.boot</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jetty</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.eclipse.jetty.websocket</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jetty</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>jstl</artifactId>
            </dependency>
            <dependency>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>apache-jsp</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-solr</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>jstl</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.11</artifactId>
                <version>1.6.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-graphx_2.11</artifactId>
                <version>${org.apache.spark-version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.6.5</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.6.5</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>2.6.5</version>
            </dependency>
    
        </dependencies>
        <packaging>war</packaging>
    
        <repositories>
            <repository>
                <id>spring-snapshots</id>
                <name>Spring Snapshots</name>
                <url>https://repo.spring.io/snapshot</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
            <repository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
            <repository>
                <id>maven2</id>
                <url>http://repo1.maven.org/maven2/</url>
            </repository>
        </repositories>
        <pluginRepositories>
            <pluginRepository>
                <id>spring-snapshots</id>
                <name>Spring Snapshots</name>
                <url>https://repo.spring.io/snapshot</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </pluginRepository>
            <pluginRepository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </pluginRepository>
        </pluginRepositories>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-war-plugin</artifactId>
                    <configuration>
                        <warSourceDirectory>src/main/webapp</warSourceDirectory>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.mortbay.jetty</groupId>
                    <artifactId>jetty-maven-plugin</artifactId>
                    <configuration>
                        <systemProperties>
                            <systemProperty>
                                <name>spring.profiles.active</name>
                                <value>development</value>
                            </systemProperty>
                            <systemProperty>
                                <name>org.eclipse.jetty.server.Request.maxFormContentSize</name>
                                <!-- -1代表不作限制 -->
                                <value>600000</value>
                            </systemProperty>
                        </systemProperties>
                        <useTestClasspath>true</useTestClasspath>
                        <webAppConfig>
                            <contextPath>/</contextPath>
                        </webAppConfig>
                        <connectors>
                            <connector implementation="org.eclipse.jetty.server.nio.SelectChannelConnector">
                                <port>7080</port>
                            </connector>
                        </connectors>
                    </configuration>
                </plugin>
            </plugins>
    
        </build>
    
    </project>

    (3)SubmitJobToSpark.java

    package com.example.spark;
    
    import org.apache.spark.deploy.SparkSubmit;
    
    /**
     * @author kevin
     *
     */
    public class SubmitJobToSpark {
    
        public static void submitJob() {
            String[] args = new String[] { "--master", "yarn-client", "--name", "test java submit job to spark", "--class", "com.example.spark.WordCount", "/opt/wordcount.jar", "hdfs://hacluster/aa/hello.txt" };
            SparkSubmit.main(args);
        }
    }

    (4)SparkController.java

    package com.example.spark.web.controller;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    import com.example.spark.SubmitJobToSpark;
    
    @Controller
    @RequestMapping("spark")
    public class SparkController {
        private Logger logger = LoggerFactory.getLogger(SparkController.class);
    
        @RequestMapping(value = "sparkSubmit", method = { RequestMethod.GET, RequestMethod.POST })
        @ResponseBody
        public String sparkSubmit(HttpServletRequest request, HttpServletResponse response) {
            logger.info("start submit spark tast...");
            SubmitJobToSpark.submitJob();
            return "hello";
        }
    
    }

    5)将项目spark-submit打成war包部署到Master节点tomcat上,访问如下请求:

      http://10.57.22.244:9090/spark/sparkSubmit

      在tomcat的log中能看到计算的结果。

  • 相关阅读:
    Building a RESTful Web Service
    Proxy setting
    同步机制 note
    C++: Virtual Table and Shared Memory
    2018中国大学生程序设计竞赛
    2018 MUltiU 9 dp / 8 upper_bound ; 构造?/
    2018/8/10 部分枚举(类似尺取)
    2018/8/9 MultiU 6 并查集+dfs,反向建边提高查询效率 !!! / 最大字段和n维(降维)/ 状压+中途相遇法
    2018/7/29 cf 499 div 2(1011)
    2018/7/28 欧拉路径
  • 原文地址:https://www.cnblogs.com/Mrwan/p/7380574.html
Copyright © 2011-2022 走看看