zoukankan      html  css  js  c++  java
  • win10下将spark的程序提交给远程集群中运行

    一,开发环境:

     操作系统:win19 64位

         IDE:IntelliJ IDEA

         JDK:1.8

         scala:scala-2.10.6

         集群:linux上cdh集群,其中spark为1.5.2,hadoop:2.6.0(其实我也想用spark最新版和hadoop的最新版,但1.6以前有spark-assembly-1.x.x-hadoop2.x.x.jar)

    二,实现步骤:

      1,设置maven的pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>spark</groupId>
      <artifactId>test</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <scala.version>2.10.6</scala.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>commons-logging</groupId>
          <artifactId>commons-logging</artifactId>
          <version>1.1.1</version>
          <type>jar</type>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.1</version>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.9</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>2.6.0</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-mllib_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-hive_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-streaming_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.10.6</version>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <configuration>
              <downloadSources>true</downloadSources>
              <buildcommands>
                <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
              </buildcommands>
              <additionalProjectnatures>
                <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
              </additionalProjectnatures>
              <classpathContainers>
                <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
              </classpathContainers>
            </configuration>
          </plugin>
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    </project>

    2,编写简单程序:

    object test {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("spark://xxxxx:7077").setAppName("test")
        val sc = new SparkContext(conf)
        sc.addJar("E:\sparkTest\out\artifacts\sparkTest_jar\sparkTest.jar")
        val count = sc.parallelize(1 to 4).filter { _ =>
          val x = math.random
          val y = math.random
          x*x + y*y < 1
        }.count()
        println(s"Pi is roughly ${4.0 * count / 4}")
        sc.stop()
      }
    }

    3,打jar包,即:file->projectStruct->Artifacts->Build->Build Artifacts,点击run运行即可(刚刚试试了下,发现不要jar也能运行,只是控制台还没结果输出?)

    4,pom.xml的spark版本号要和集群中spark的版本号一致(不一致出现:exception1:java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem)

    5,异常: Could not locate executable nullinwinutils.exe in the Hadoop binaries

    解决方法:

    1,下载hadoop的包,我下了hadoop-2.7.3,解压,并配置HADOOP_HOME即可

    2,下载https://github.com/srccodes/hadoop-common-2.2.0-bin下载winutils.exe,放到hadoop目录下的bin中

    3,重启idea异常消失

    6, Exception while deleting Spark temp dir: C:U
    sers endAppDataLocalTempspark-70484fc4-167d-48fa-a8f6-54db9752402euserFiles-27a65cc7
    -817f-4476-a2a2-58967d7b6cc1    解决方法:目前spark在windows系统下存在这个问题。不想看的话,就把log4j.properties中log的level设置为FATAL吧(呵呵呵)

     7,com.google.protobuf.InvalidProtocolBufferException: Protocol message end-gro:hdfs的ip地址或端口号输入有问题,

    hdfs://xxxx:8020//usr/xxx (新版本端口多为9000)

    8,oracle读写操作:

    package spark
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.hive.HiveContext
    
    object readFromOracle {
    def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.FATAL)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf=new SparkConf().setMaster("spark://xxxxxx:7077").setAppName("read")
    .setJars(List("E:\softs\softDownload\ojdbc14.jar"))//添加ojdbc14的jar包,会出现
    val sc=new SparkContext(conf)
    val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxxx:1521:testdb11g"
    val jdbcMap=Map("url" -> oracleDriverUrl,"user"->"xxxxx","password"->"xxxxx","dbtable"->"MYTABLE","driver"->"oracle.jdbc.driver.OracleDriver")
    val sqlContext = new HiveContext(sc)
    val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
    jdbcDF.show(3)
    }
    }
    package spark
    import java.sql.{Connection, DriverManager, PreparedStatement}
    import java.util.Properties

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
    import org.apache.spark.sql.types._

    /**
    * Created by Administrator on 2017/7/17.
    */
    object writeToOracle {
    def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.FATAL)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    /*
    记得设置jar包,虽然build时添加了ojdbc.jar,但仍然出现jdbc:oracle:thin:@xxxxxxxx:testdb11g
    at java.sql.DriverManager.getConnection(DriverManager.java:689),看来build时不行
    最好将依赖的jar包上传到hdfs上不要在本地
    */
    val conf=new SparkConf().setMaster("spark://xxxxxxx:7077").setAppName("write")
    .setJars(List("E:\sparkTest\out\artifacts\writeToOracle_jar\sparkTest.jar","E:\softs\softDownload\ojdbc14.jar"))
    val sc=new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxx:testdb11g"
    val jdbcMap=Map("url" -> oracleDriverUrl,"user"->"xxxx","password"->"xxxxxx","dbtable"->"MYTABLE","driver"->"oracle.jdbc.driver.OracleDriver")
    val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
    jdbcDF.foreachPartition(rows => {
    Class.forName("oracle.jdbc.driver.OracleDriver")
    val connection: Connection = DriverManager.getConnection(oracleDriverUrl, "xxxx","xxxxxxx")
    val prepareStatement: PreparedStatement = connection.prepareStatement("insert into MYTABLE2 values(?,?,?,?,?,?,?,?,?)")
    rows.foreach(row => {
    prepareStatement.setString(1, row.getString(0))
    prepareStatement.setString(2, row.getString(0))
    prepareStatement.setString(3, row.getString(0))
    prepareStatement.setString(4, row.getString(0))
    prepareStatement.setString(5, row.getString(0))
    prepareStatement.setString(6, row.getString(0))
    prepareStatement.setString(7, row.getString(0))
    prepareStatement.setString(8, row.getString(0))
    prepareStatement.setString(9,row.getString(0))
    prepareStatement.addBatch()
    })
    prepareStatement.executeBatch()
    prepareStatement.close()
    connection.close()
    })
    }
    }

     复制数据库,操作:

    package spark.sql
    
    import java.util.Properties
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
    import org.apache.spark.sql.types._
    
    /**
      * Created by Administrator on 2017/7/21.
      */
    object OperateOracle {
      Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
      Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxx:1521:testdb11g"
      val jdbcMap=Map("url" -> oracleDriverUrl,
        "user"->"xxxxxx","password"->"xxxxxxx",
        "dbtable"->"MYTABLE",
        "driver"->"oracle.jdbc.driver.OracleDriver")
      def main(args: Array[String]) {
        //创建SparkContext
        val sc = createSparkContext
        //创建sqlContext用来连接oracle、Hive等
        val sqlContext = new HiveContext(sc)
        //加载oracle表数据,为lazy方式
        val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
        jdbcDF.registerTempTable("MYTABLEDF")
        val df2Oracle = sqlContext.sql("select * from MYTABLEDF")
    
        //Registering the OracleDialect
        JdbcDialects.registerDialect(OracleDialect)
    
        val connectProperties = new Properties()
        connectProperties.put("user", "xxxxxx")
        connectProperties.put("password", "xxxxxxx")
        Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()
    
        //write back Oracle
        //Note: When writing the results back orale, be sure that the target table existing
        JdbcUtils.saveTable(df2Oracle, oracleDriverUrl, "MYTABLE2", connectProperties)
    
        sc.stop
      }
      def createSparkContext: SparkContext = {
        val conf = new SparkConf().setAppName("Operate")
          .setMaster("spark://xxxxxx:7077")
          .setJars(List("hdfs://xxxxx:8020//user//ojdbc14.jar"))
        //SparkConf parameters setting
        //conf.set("spark.sql.autoBroadcastJoinThreshold", "50M")
        /*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/
        //conf.set("spark.sql.codegen", "true")
        /*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/
        //conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
        /*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/
        //conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
        val sc = new SparkContext(conf)
        sc
      }
    
      //overwrite JdbcDialect fitting for Oracle
      val OracleDialect = new JdbcDialect {
        override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
        //getJDBCType is used when writing to a JDBC table
        override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
          case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
          case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
          case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
          case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
          case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
          case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
          case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
          case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
          case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
          case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
          case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
          //        case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
          case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
          case _ => None
        }
      }
    }

    此时的pom.xml:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>spark</groupId>
      <artifactId>test</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <scala.version>2.10.6</scala.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>commons-logging</groupId>
          <artifactId>commons-logging</artifactId>
          <version>1.1.1</version>
          <type>jar</type>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.1</version>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.9</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>2.6.0</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-mllib_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-hive_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-streaming_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
        <dependency>
          <groupId>com.databricks</groupId>
          <artifactId>spark-csv_2.10</artifactId>
          <version>1.5.0</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.10.6</version>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <configuration>
              <downloadSources>true</downloadSources>
              <buildcommands>
                <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
              </buildcommands>
              <additionalProjectnatures>
                <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
              </additionalProjectnatures>
              <classpathContainers>
                <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
              </classpathContainers>
            </configuration>
          </plugin>
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    </project>
    View Code
    
    
  • 相关阅读:
    WCF 、Web API 、 WCF REST 和 Web Service 的区别
    BusyIndicator using MVVM 忙碌状态指示器的的实现
    复制文件夹的方法 .net
    SQL/LINQ/Lamda
    CSLA验证规则总结
    C++中GB2312字符串和UTF-8之间的转换
    如何用VC编写供PB调用的DLL
    【转】lucene4.3.0 配置与调试
    cygwin主要命令
    【转】eclipse中window->preference选项中没有tomcat的解决方法
  • 原文地址:https://www.cnblogs.com/ksWorld/p/7193177.html
Copyright © 2011-2022 走看看