zoukankan      html  css  js  c++  java
  • Spark 基础操作

    1. Spark 基础

    2. Spark Core

    3. Spark SQL

    4. Spark Streaming

    5. Spark 内核机制

    6. Spark 性能调优

    1. Spark 基础

    1.1 Spark 中的相应组件

    1.2 Standalone 模式安装

    // 1. 准备安装包(见下方参考资料): spark-2.1.3-bin-hadoop2.7.tgz
    
    // 2. 修改配置文件
    // 2.1 spark-env.sh.template
    mv spark-env.sh.template spark-env.sh
    SPARK_MASTER_HOST=IP地址
    SPARK_MASTER_PORT=7077
    
    // 3. 启动
    sbin/start-all.sh
    
    // 4. 浏览器访问
    http://IP地址:8080
    
    // 5. 测试官方案例
    bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://IP地址:7077 --executor-memory 1G --total-executor-cores 2 ./examples/jars/spark-examples_2.11-2.1.3.jar 100
    
    // 6. 使用 Spark Shell 测试 WordCount
    bin/spark-shell --master spark://10.110.147.193:7077
    
    sc.textFile("./RELEASE").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    

    1.2.1 提交应用程序概述

    1. bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://IP地址:7077 --executor-memory 1G --total-executor-cores 2 ./examples/jars/spark-examples_2.11-2.1.3.jar 100
      • --class: 应用程序的启动类,例如,org.apache.spark.examples.SparkPi;
      • --master: 集群的master URL;
      • deploy-mode: 是否发布你的驱动到worker节点(cluster)或者作为一个本地客户端(client);
      • --conf: 任意的Spark配置属性,格式:key=value,如果值包含空格,可以加引号"key=value";
      • application-jar:打包好的应用 jar,包含依赖,这个URL在集群中全局可见。比如hdfs://共享存储系统,如果是file://path,那么所有节点的path都包含同样的jar;
      • application-arguments: 传给main()方法的参数;

    1.3 JobHistoryServer 配置

    1. 修改spark-defaults.conf.template名称:mv spark-defaults.conf.template spark-defaults.conf;
    2. 修改spark-defaults.conf文件,开启 Log:
      • spark.eventLog.enabled true;
      • spark.eventLog.dir hdfs://IP地址:9000/directory;
      • 注意:HDFS 上的目录需要提前存在;
    3. 修改 spark-env.sh 文件,添加如下配置:
      • export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://IP地址:9000/directory";
    4. 开启历史服务:sbin/start-history-server.sh;
    5. 执行上面的程序:org.apache.spark.examples.SparkPi;
    6. 访问:http//IP地址:4000;

    1.4 Spark HA 配置

    1. zookeeper 正常安装并启动;
    2. 修改 spark-env.sh 文件,添加如下配置:
      • 注释掉:
        • SPARK_MASTER_HOST=IP地址;
        • SPARK_MASTER_PORT=7077
      • export SPARK_DEAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=IP地址1, IP地址2, IP地址3 -Dspark.deploy.zookeeper.dir=/spark"

    1.5 Yarn 模式安装

    1. 修改 hadoop 配置文件yarn-site.xml,添加如下内容:
    <!--是否启动一个线程,检查每个任务正在使用的物理内容量,如果任务超出分配值,则直接将其杀掉,默认为true-->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <!--是否启动一个线程,检查每个任务正在使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
    
    1. 修改 spark-env.sh,添加如下配置:
      • YARN_CONF_DIR=/opt/module/hadoop-2.8.5/etc/hadoop
      • HADOOP_CONF_DIR=/opt/module/hadoop-2.8.5/etc/hadoop

    1.6 Spark Shell WordCount 流程

    • sc.textFile("文件地址").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).saveAsTextFile.("输出结果")

    1.6 Eclipse 编写 WordCount 程序

    // ===== 1. 创建 Maven Project
    // ===== 2. 导入依赖 pom.xml
    <dependencies>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-core_2.11</artifactId>
              <version>2.1.3</version>
          </dependency>
      </dependencies>
      <build>
      		<finalName>WordCount</finalName>
            <plugins>
            <!--java 的编译版本 1.8-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
                 <plugin>
    	            <groupId>org.apache.maven.plugins</groupId>
    	            <artifactId>maven-surefire-plugin</artifactId>
    	            <version>2.17</version>
    	        </plugin>
    	        <plugin>
    	        	<groupId>net.alchim31.maven</groupId>
    	        	<artifactId>scala-maven-plugin</artifactId>
    	        	<version>3.2.2</version>
    	        	<executions>
    	        	    <execution>
    	        	        <goals>
    	        	        	<goal>compile</goal>
    	        	        	<goal>test-compile</goal>
    	        	        </goals>
    	        	    </execution>
    	        	</executions>
    	        </plugin>
    	        <plugin>
    	        	<groupId>org.apache.maven.plugins</groupId>
    	        	<artifactId>maven-assembly-plugin</artifactId>
    	        	<version>3.0.0</version>
    	        	<configuration>
    	        		<archive>
    	        		    <manifest>
    	        		        <mainClass>com.noodles.WordCount</mainClass>
    	        		    </manifest>
    	        		</archive>
    	        		<descriptorRefs>jar-with-dependencies</descriptorRefs>
    	        	</configuration>
    	        	<executions>
    	        	    <execution>
    	        	        <id>make-assembly</id>
    	        	        <phase>package</phase>
    	        	        <goals>
    	        	            <goal>single</goal>
    	        	        </goals>
    	        	    </execution>
    	        	</executions>
    	        </plugin>
            </plugins>
        </build>
    
    
    // ===== 3. 创建 Maven Module
    // ===== 4. 创建 Scala Class: WordCount
    object WordCount {
    
        def main(args:Array[String]): Unit = {
    
            // 1. 创建配置信息
            val conf = new SparkConf().setAppName("wc")
    
            // 2. 创建 sparkcontext
            val sc = new SparkContext(conf)
    
            // 3. 处理逻辑
    
            // 读取数据
            val lines = sc.textFile(args(0))
            val words = lines.flatMap(_.split(" "))
            val k2v = words.map((_, 1))
            val result = k2v.reduceByKey(_+_)
    
            // 输出
            // result.collect()
            // 将输出结果保存到文件
            result.saveAsTextFile(args(1))
    
            // 4. 关闭连接
            sc.stop()
        }
    }
    

    **参考资料:** - [spark-2.1.3-bin-hadoop2.7.tgz下载](https://archive.apache.org/dist/spark/spark-2.1.3/) - [eclipse的maven、Scala环境搭建](https://blog.csdn.net/u014353787/article/details/50166789)
  • 相关阅读:
    解决使用vim-go插件时候保存go代码导致设置好的折叠消失的问题
    golang中从一个日期开始往后增加一段时间
    linux kernal oom killer 学习
    awk学习笔记
    FlexMonkey实战
    如何阅读一本书-读书笔记
    Oracle性能问题sql调优脚本集
    了解ORACLE培训OCA-OCP-OCM课程表
    Sublime Text 3设置笔记
    【总结】AngularJs学习总结
  • 原文地址:https://www.cnblogs.com/linkworld/p/11070946.html
Copyright © 2011-2022 走看看