zoukankan      html  css  js  c++  java
  • spark集群配置以及java操作spark小demo

    spark

    安装

    tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz
    rm spark-2.4.0-bin-hadoop2.7.tgz
    mv spark-2.4.0-bin-hadoop2.7 spark
    
    sudo vim /etc/profile
        export SPARK_HOME=/usr/local/storm
        export PATH=$PATH:$SPARK_HOME/bin
    
    source /etc/profile
    
    准备 master worker1 worker2 worker3 这四台机器
    
    首先确保你的Hadoop集群能够正常运行worker1 worker2 worker3为DataNode, master为NameNode
    具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/10192857.html
    

    配置

    1. spark-env.sh

      进入spark的conf目录下,cp spark-env.sh.template spark-env.sh
      
      sudo vim spark-env.sh
          输入如下配置
          export JAVA_HOME=/usr/local/jdk/jdk-11.0.1
          export SCALA_HOME=/usr/local/scala/scala
          export HADOOP_HOME=/usr/local/hadoop/hadoop-3.1.1
          export SPARK_HOME=/usr/local/spark/spark
          export HADOOP_CONF_DIR=/usr/local/hadoop/hadoop-3.1.1/etc/hadoop
          export SPARK_MASTER_HOST=master
          export SPARK_WORKER_MEMORY=1g
          export SPARK_WORKER_CORES=1
      
    2. slaves

      进入spark的conf目录下,cp slaves.template slaves
      
      sudo vim slaves
          输入如下配置    
          master 
          worker1 
          worker2 
          worker3
      
    3. 启动

      在master中运行 sbin/start-all.sh 即可
      
      访问http://master:8080/即可看到spark的ui
      

    使用java来操作spark

    写个小demo,用来分析10万个数据中男女人数

    1. 模拟数据的java代码

      // 模拟数据
      // 10万个人当中,统计青年男性和青年女性的比例,看看男女比例是否均衡
      FileOutputStream f = null;
      ThreadLocalRandom random = ThreadLocalRandom.current();
      String str = "";
      int count = 0;
      try {
          f = new FileOutputStream("C:\Users\26401\Desktop\data.txt", true); 
          for(;count<100000;count++) {
              str = count + " " + random.nextInt(18, 28) + " " + (random.nextBoolean()?'M':'F');
              f.write((str + "
      ").getBytes());
          }
                                                      
      } catch (Exception e) {
          e.printStackTrace();
      } finally {
          try {
              if(f != null) f.close();
          } catch (IOException e) {
              e.printStackTrace();
          }
      }
      
    2. 依赖

      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <groupId>test</groupId>
          <artifactId>test</artifactId>
          <version>1.0.0</version>
          <name>test</name>
          <description>Test project for spring boot mybatis</description>
          <packaging>jar</packaging>
      
          <properties>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
              <java.version>1.8</java.version>
              <maven.compiler.source>1.8</maven.compiler.source>
              <maven.compiler.target>1.8</maven.compiler.target>
          </properties> 
      
      
          <dependencies>
              
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-core_2.12</artifactId>
                  <version>2.4.0</version>
              </dependency>
      
              <dependency>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-api</artifactId>
                  <version>1.7.25</version>
              </dependency>
              
              <dependency>
                  <groupId>junit</groupId>
                  <artifactId>junit</artifactId>
                  <version>3.8.1</version>
              </dependency>
      
              
          </dependencies>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-jar-plugin</artifactId>
                      <configuration>
                          <archive>
                              <manifest>
                                  <addClasspath>true</addClasspath>
                                  <useUniqueVersions>false</useUniqueVersions>
                                  <classpathPrefix>lib/</classpathPrefix>
                              </manifest>
                          </archive>
                      </configuration>
                  </plugin>
              </plugins>
          </build>
      </project>
      
    3. java代码

      package test;
      
      import java.io.Serializable;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.Function;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      
      public class App implements Serializable
      {
          
          private static final long serialVersionUID = -7114915627898482737L;
      
          
      
          public static void main(String[] args) throws Exception {
              Logger logger=LoggerFactory.getLogger(App.class);
      
              SparkConf sparkConf = new SparkConf();
      
              sparkConf.setMaster("spark://master:7077");
              sparkConf.set("spark.submit.deployMode", "cluster");
              sparkConf.setAppName("FirstTest");
              
              JavaSparkContext sc = new JavaSparkContext(sparkConf);
              JavaRDD<String> file = sc.textFile("hdfs://master:9000/data.txt");
              
              JavaRDD<String> male = file.filter(new Function<String, Boolean>() {
                  private static final long serialVersionUID = 1L;
      
                  @Override
                  public Boolean call(String s) throws Exception {
                      logger.info(s);
                      return s.contains("M");
                  }
              });
              logger.info("**************************************");
              logger.info(male.count()+""); // 49991
              logger.info("**************************************");
              sc.close();
      
              // 其他的api请自行查阅,很简单,不想看,可以自己瞎点
          }
      }
      
    4. 运行

      1. 将生成的测试数据data.txt上传至hdfs
      2. 将打包的jar上传到master机器
      3. 运行 bin/spark-submit --master spark://master:7077 --class test.App test-1.0.0.jar 
      4. 进入spark的ui界面可以清楚的看到打印的消息
      
  • 相关阅读:
    [翻译] SVProgressHUD
    使用CoreData [4]
    Android学习笔记之AndroidManifest.xml文件解析
    Android 版本自动更新
    两个android程序间的相互调用(apk互调)
    Android 生成含签名文件的apk安装包
    【已解决】Android ADT中增大AVD内存后无法启动:emulator failed to allocate memory 8
    android adt与android sdk有什么关系,他们在开发中各起到什么作用
    Please ensure that adb is correctly located at……问题解决方案
    安装Android SDK时,点击SDK Manager.exe闪退,并且jdk的环境变量是对的。
  • 原文地址:https://www.cnblogs.com/ye-hcj/p/10280114.html
Copyright © 2011-2022 走看看