zoukankan      html  css  js  c++  java
  • win下写任务提交给集群

    一,复制和删除hdfs中的文件

    import org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.log4j.{Level, Logger}
    /**
      * Created by Administrator on 2017/7/14.
      */
    object test {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        val conf = new SparkConf().setMaster("spark://xxxx:7077").setAppName("test")//集群名:spark://xxxx:7077
        val sc = new SparkContext(conf) 
        val output = new Path("hdfs://xxxx:8020//usr")
        val input=new Path("E:\Program\datas\test.csv")
        val hdfs = org.apache.hadoop.fs.FileSystem.get(
          new java.net.URI("hdfs://cdh.codsway.com:8020"), new org.apache.hadoop.conf.Configuration())
      if (hdfs.exists(output)){
          hdfs.copyFromLocalFile(false,input,output)
          hdfs.delete(output,true)
        } 
        sc.stop()

     异常:Spark错误:WARN TaskSchedulerImpl: Initial job has not accepted any resources;idea中没错误显示,可以去8080看异常显示

    1,集群中的每台机子添加自己win的主机名和ip

    2,关闭防火墙

    3,使用程序设置.set("spark.driver.host","win 的ip地址”

    异常2: java.net.SocketTimeoutException: connect timed out 总访问跟自己主机不同的ip地址

    查看发现该ip地址是vm8地址,禁用vm1和vm8

    异常3:java.lang.ClassNotFoundException: SparkPi$$anonfun$1

    出现这个错误可能有两种情况,Jar文件没有传上去,或者Build Path里面包含的Jar文件和Spark的运行环境有冲突。

    第一种:val conf = new SparkConf().setAppName("Spark Pi").setMaster("spark://master:7077").setJars(Seq("E:\Intellij\Projects\SparkExample\SparkExample.jar"))

    第二种:需要把Build Path里面的Jar文件删除,因为Spark运行环境已经有这些文件了,没必要再继续打包。删除以后,既减少了打包后文件的大小,同时也不会和Spark运行环境的Jar文件产生冲突。

    异常:java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer informat

    jar包冲突:原因是

    产生冲突在spark-assmble-hadoop.jar和

    <!--<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
    </dependency>-->

    解决方法:去掉上述maven依赖

    异常4:java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1(standalone下操作hdfs时,ha下再尝试)

    添加集群中的core.xml,并修改如下

    <name>fs.defaultFS</name>
    <value>hdfs://xxxx:8020</value>

    异常5: java.io.IOException: Cannot run program "/etc/hadoop/conf.cloudera.yarn/topology.py" 

      修改工程中的core-site.xml,找到配置net.topology.script.file.name,将其value注释掉

    异常6:Couldn't create proxy provider class org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

    解决方法:修改core.xml和hdfs.xml保证:(由于是HA模式下的集群,yarn模式下提交任务)

    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://nameservice1</value>
    </property>
    <property>
    <name>dfs.nameservices</name>
    <value>nameservice1</value>
    </property>

    win以standalone将任务提交给集群中:

    前提:将win的ip地址加入集群中,关闭防火墙,win下的java,scala,hadoop,spark等相关home和path配好了,远程集群是cdh5.4.7

    将spark-assembly-1.5.2-hadoop2.6.0.jar导入到项目中,

    pom.xml如下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>spark</groupId>
      <artifactId>test</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <scala.version>2.10.6</scala.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>commons-logging</groupId>
          <artifactId>commons-logging</artifactId>
          <version>1.1.1</version>
          <type>jar</type>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.1</version>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.9</version>
        </dependency>
       <!-- <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.6.0</version>
        </dependency>-->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-mllib_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-hive_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-streaming_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
        <dependency>
          <groupId>com.databricks</groupId>
          <artifactId>spark-csv_2.10</artifactId>
          <version>1.5.0</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.10.6</version>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <configuration>
              <downloadSources>true</downloadSources>
              <buildcommands>
                <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
              </buildcommands>
              <additionalProjectnatures>
                <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
              </additionalProjectnatures>
              <classpathContainers>
                <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
              </classpathContainers>
            </configuration>
          </plugin>
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    </project>
    View Code

    idea中的代码:

    package spark
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.math.random
    
    object BasicOperate {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        val sc=createSparkContext()
        computePI(sc)
      }
      def createSparkContext():SparkContext={
        val conf =new SparkConf().setMaster("spark://xxxx:7077")
          .setAppName("test")
          .setJars(List("E:\sparkTest\out\artifacts\xxxx\sparkTest.jar"))
        val sc=new SparkContext(conf)
        sc
      }
      def computePI(sc:SparkContext):Unit={
        val slices=2
        val n=100000 *slices
        val count=sc.parallelize(1 to n,slices).map{
          i =>
            val x= random * 2 - 1
            val y =random * 2 - 1
            if(x * x + y * y <1)1 else 0
        }.reduce(_+_)
        println("Pi is roughly "+count)
      }
    
    }

    先点击build-rebuild,再点击run,即可出结果。疑问:有些代码并不需要setJars,还有为啥要build?

    前提要将spark/conf和hadoop/etc/conf中的配置文件到目录下,并将目录属性设置为resource,配置文件如图:

    idea以yarn-client模式提交任务:

    package spark.sql
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    object OperateHive {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        val conf=new SparkConf()
          .setMaster("yarn-client")
          .setAppName("hiveOperate")
        val sc=new SparkContext(conf)
        val hiveContext=new HiveContext(sc)
        //展现数据库
        hiveContext.sql("SHOW DATABASES").show()
        //创建数据库
        hiveContext.sql("CREATE DATABASE IF NOT EXISTS hivedb")
        //创建表
        hiveContext.sql("CREATE TABLE IF NOT EXISTS hivedb.test(key INT,value STRING) " +
          "row format delimited fields terminated by ','").printSchema()
        //加载内容到表中
        //hiveContext.sql("LOAD DATA LOCAL INPATH 'hdfs://...' into table hivedb.test" )
        hiveContext.sql("USE hivedb")
        //hiveContext.sql("INSERT INTO test VALUES(3,'zz')")不支持吗?
        hiveContext.sql("SELECT * FROM test").show()
      }
    }

     疑问,不支持修改,hive是基于ha模式的,注意:在standalone下时,在读取hdfs时,需要把上面的conf文件的resource属性去除才能顺利跑完,生产中一般用yarn模式进行工作,但yarn出结果慢些。。。。。;

  • 相关阅读:
    高级I/O之存储映射I/O
    高级I/O之readn和writen函数
    高级I/O之readv和writev函数
    高级I/O之异步I/O
    高级I/O之I/O多路转接——pool、select
    高级I/O之STREAMS
    高级I/O之记录锁
    高级I/O之非阻塞I/O
    用于守护进程的出错处理函数
    守护进程之客户进程-服务器进程模型
  • 原文地址:https://www.cnblogs.com/ksWorld/p/7201208.html
Copyright © 2011-2022 走看看