zoukankan      html  css  js  c++  java
  • Spark 案例实操

    案例实操

      Spark Shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在 IDE
    中编制程序,然后打成 jar 包,然后提交到集群,最常用的是创建一个 Maven 项目,利用
    Maven 来管理 jar 包的依赖。
     
     

    1 编写 WordCount 程序

    1)创建一个 Maven 项目 WordCount 并导入依赖
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.lxl</groupId>
        <artifactId>spark02</artifactId>
        <packaging>pom</packaging>
        <version>1.0-SNAPSHOT</version>
        <modules>
            <module>sparkCore</module>
        </modules>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    2)编写代码
    package com.lxl
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
    
        /*
        1.创建配置信息
         */
        val conf = new SparkConf().setAppName("wc")
    
    
        /*
        2.创建sparkcontext
         */
        val sc = new SparkContext(conf)
    
    
    
        /*
        3.处理
         */
        //读取数据
        val lines = sc.textFile(args(0)) //传入路径
    
        //压平 flatMap
        val words = lines.flatMap(_.split(" "))
    
        //map(word,1)
        val k2v = words.map((_, 1))
    
        //resuceBykey(word, x)
        val result = k2v.reduceByKey(_ + _)
    
        //输出,展示
        //result.collect()
        
        //保存数据到文件
        result.saveAsTextFile(args(1)) //传入的保存文件的目录
    
    
        //关闭连接
        sc.stop()
    
    
    
      }
    
    }
     
     
    3)打包插件
     
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>spark01</artifactId>
            <groupId>com.atlxl</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>sparkCore</artifactId>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>WordCount</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
     
     
    4)打包到集群测试
     
    先将 jar 包拷贝到 spark 的家目录下,改名为wordcount。
    [lxl@hadoop102 spark]$ mv sparkCore-1.0-SNAPSHOT.jar wordcount.jar
    [lxl@hadoop102 spark]$ bin/spark-submit 
    --class com.lxl.WordCount 
    --master spark://hadoop102:7077 
    --executor-memory 1G 
    --total-executor-cores 2 
    ./wordcount.jar 
    hdfs://hadoop102:9000/fruit.tsv 
    hdfs://hadoop102:9000/out
     
     
     
     
     
     

    2 本地调试

      本地 Spark 程序调试需要使用 local 提交模式,即将本机当做运行环
    境,Master 和 Worker 都为本机。运行时直接加断点调试即可。如下:
    创建 SparkConf 的时候设置额外属性,表明本地执行:
    val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
     
     
    完整代码:(只需将 WordCount 第二步的代码稍作修改即可)
    package com.lxl
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
        /*
        1.创建配置信息
         */
        val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
    
    
        /*
        2.创建sparkcontext
         */
        val sc = new SparkContext(conf)
    
    
    
        /*
        3.处理
         */
        //读取数据
        val lines = sc.textFile("C:\Users\67001\Desktop\word.txt") //本地路径
    
        //压平 flatMap
        val words = lines.flatMap(_.split(" "))
    
        //map(word,1)
        val k2v = words.map((_, 1))
    
        //resuceBykey(word, x)
        val result = k2v.reduceByKey(_ + _)
    
        //输出,展示
    //    result.collect()
    
        //保存数据
    //    result.saveAsTextFile(args(1))
    
        //打印到控制台
        result.foreach(println)
    
    
        //关闭连接
        sc.stop()
      }
    
    }

    3 远程调试

      通过 IDEA 进行远程调试,主要是将 IDEA 作为 Driver 来提交应用程序,

    配置过程如下:

      修改 sparkConf,添加最终运行的 Jar 包、Driver 程序的地址,

    并设置 Master 的提交地址:

    val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077")
    .setJars(List("D:\Workspace\IDEA_work\Spark_Work\spark02\sparkCore\target\sparkCore-1.0-SNAPSHOT.jar"))
     完整代码:
    package com.lxl
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
        /*
        1.创建配置信息
         */
        val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077")
          .setJars(List("D:\Workspace\IDEA_work\Spark_Work\spark02\sparkCore\target\sparkCore-1.0-SNAPSHOT.jar"))
    
    
        /*
        2.创建sparkcontext
         */
        val sc = new SparkContext(conf)
    
    
    
        /*
        3.处理
         */
        //读取数据
        val lines = sc.textFile("hdfs://hadoop102:9000/fruit.tsv") //HDFS路径
    
        //压平 flatMap
        val words = lines.flatMap(_.split(" "))
    
        //map(word,1)
        val k2v = words.map((_, 1))
    
        //resuceBykey(word, x)
        val result = k2v.reduceByKey(_ + _)
    
        //输出,展示
    //    result.collect()
    
        //保存数据
        result.saveAsTextFile("hdfs://hadoop102:9000/out1") //保存到HDFS的路径
    
        //打印到控制台
    //    result.foreach(println)
    
    
        //关闭连接
        sc.stop()
      }
    
    }
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    ACL的基本访问列表与高级访问列表
    ACL配置
    OSPF与ACL综合应用
    RSTP基础配置
    基于接口地址池和基于全局配置的DHCP
    在ensp上通过FTP进行文件操作
    在ensp上配置通过Stelnet登录系统
    在ensp上配置通过Telent登录系统
    在eNSP上简单的模拟企业网络场景(不同网段互连)
    虚拟机中使用Samba实现文件共享
  • 原文地址:https://www.cnblogs.com/LXL616/p/11139436.html
Copyright © 2011-2022 走看看