zoukankan      html  css  js  c++  java
  • win10下将spark的程序提交给远程集群中运行

    一,开发环境:

     操作系统:win19 64位

         IDE:IntelliJ IDEA

         JDK:1.8

         scala:scala-2.10.6

         集群:linux上cdh集群,其中spark为1.5.2,hadoop:2.6.0(其实我也想用spark最新版和hadoop的最新版,但1.6以前有spark-assembly-1.x.x-hadoop2.x.x.jar)

    二,实现步骤:

      1,设置maven的pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>spark</groupId>
      <artifactId>test</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <scala.version>2.10.6</scala.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>commons-logging</groupId>
          <artifactId>commons-logging</artifactId>
          <version>1.1.1</version>
          <type>jar</type>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.1</version>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.9</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>2.6.0</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-mllib_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-hive_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-streaming_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.10.6</version>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <configuration>
              <downloadSources>true</downloadSources>
              <buildcommands>
                <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
              </buildcommands>
              <additionalProjectnatures>
                <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
              </additionalProjectnatures>
              <classpathContainers>
                <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
              </classpathContainers>
            </configuration>
          </plugin>
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    </project>

    2,编写简单程序:

    object test {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("spark://xxxxx:7077").setAppName("test")
        val sc = new SparkContext(conf)
        sc.addJar("E:\sparkTest\out\artifacts\sparkTest_jar\sparkTest.jar")
        val count = sc.parallelize(1 to 4).filter { _ =>
          val x = math.random
          val y = math.random
          x*x + y*y < 1
        }.count()
        println(s"Pi is roughly ${4.0 * count / 4}")
        sc.stop()
      }
    }

    3,打jar包,即:file->projectStruct->Artifacts->Build->Build Artifacts,点击run运行即可(刚刚试试了下,发现不要jar也能运行,只是控制台还没结果输出?)

    4,pom.xml的spark版本号要和集群中spark的版本号一致(不一致出现:exception1:java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem)

    5,异常: Could not locate executable nullinwinutils.exe in the Hadoop binaries

    解决方法:

    1,下载hadoop的包,我下了hadoop-2.7.3,解压,并配置HADOOP_HOME即可

    2,下载https://github.com/srccodes/hadoop-common-2.2.0-bin下载winutils.exe,放到hadoop目录下的bin中

    3,重启idea异常消失

    6, Exception while deleting Spark temp dir: C:U
    sers endAppDataLocalTempspark-70484fc4-167d-48fa-a8f6-54db9752402euserFiles-27a65cc7
    -817f-4476-a2a2-58967d7b6cc1    解决方法:目前spark在windows系统下存在这个问题。不想看的话,就把log4j.properties中log的level设置为FATAL吧(呵呵呵)

     7,com.google.protobuf.InvalidProtocolBufferException: Protocol message end-gro:hdfs的ip地址或端口号输入有问题,

    hdfs://xxxx:8020//usr/xxx (新版本端口多为9000)

    8,oracle读写操作:

    package spark
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.hive.HiveContext
    
    object readFromOracle {
    def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.FATAL)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf=new SparkConf().setMaster("spark://xxxxxx:7077").setAppName("read")
    .setJars(List("E:\softs\softDownload\ojdbc14.jar"))//添加ojdbc14的jar包,会出现
    val sc=new SparkContext(conf)
    val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxxx:1521:testdb11g"
    val jdbcMap=Map("url" -> oracleDriverUrl,"user"->"xxxxx","password"->"xxxxx","dbtable"->"MYTABLE","driver"->"oracle.jdbc.driver.OracleDriver")
    val sqlContext = new HiveContext(sc)
    val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
    jdbcDF.show(3)
    }
    }
    package spark
    import java.sql.{Connection, DriverManager, PreparedStatement}
    import java.util.Properties

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
    import org.apache.spark.sql.types._

    /**
    * Created by Administrator on 2017/7/17.
    */
    object writeToOracle {
    def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.FATAL)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    /*
    记得设置jar包,虽然build时添加了ojdbc.jar,但仍然出现jdbc:oracle:thin:@xxxxxxxx:testdb11g
    at java.sql.DriverManager.getConnection(DriverManager.java:689),看来build时不行
    最好将依赖的jar包上传到hdfs上不要在本地
    */
    val conf=new SparkConf().setMaster("spark://xxxxxxx:7077").setAppName("write")
    .setJars(List("E:\sparkTest\out\artifacts\writeToOracle_jar\sparkTest.jar","E:\softs\softDownload\ojdbc14.jar"))
    val sc=new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxx:testdb11g"
    val jdbcMap=Map("url" -> oracleDriverUrl,"user"->"xxxx","password"->"xxxxxx","dbtable"->"MYTABLE","driver"->"oracle.jdbc.driver.OracleDriver")
    val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
    jdbcDF.foreachPartition(rows => {
    Class.forName("oracle.jdbc.driver.OracleDriver")
    val connection: Connection = DriverManager.getConnection(oracleDriverUrl, "xxxx","xxxxxxx")
    val prepareStatement: PreparedStatement = connection.prepareStatement("insert into MYTABLE2 values(?,?,?,?,?,?,?,?,?)")
    rows.foreach(row => {
    prepareStatement.setString(1, row.getString(0))
    prepareStatement.setString(2, row.getString(0))
    prepareStatement.setString(3, row.getString(0))
    prepareStatement.setString(4, row.getString(0))
    prepareStatement.setString(5, row.getString(0))
    prepareStatement.setString(6, row.getString(0))
    prepareStatement.setString(7, row.getString(0))
    prepareStatement.setString(8, row.getString(0))
    prepareStatement.setString(9,row.getString(0))
    prepareStatement.addBatch()
    })
    prepareStatement.executeBatch()
    prepareStatement.close()
    connection.close()
    })
    }
    }

     复制数据库,操作:

    package spark.sql
    
    import java.util.Properties
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
    import org.apache.spark.sql.types._
    
    /**
      * Created by Administrator on 2017/7/21.
      */
    object OperateOracle {
      Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
      Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      val oracleDriverUrl="jdbc:oracle:thin:@xxxxxxx:1521:testdb11g"
      val jdbcMap=Map("url" -> oracleDriverUrl,
        "user"->"xxxxxx","password"->"xxxxxxx",
        "dbtable"->"MYTABLE",
        "driver"->"oracle.jdbc.driver.OracleDriver")
      def main(args: Array[String]) {
        //创建SparkContext
        val sc = createSparkContext
        //创建sqlContext用来连接oracle、Hive等
        val sqlContext = new HiveContext(sc)
        //加载oracle表数据,为lazy方式
        val jdbcDF = sqlContext.read.options(jdbcMap).format("jdbc").load
        jdbcDF.registerTempTable("MYTABLEDF")
        val df2Oracle = sqlContext.sql("select * from MYTABLEDF")
    
        //Registering the OracleDialect
        JdbcDialects.registerDialect(OracleDialect)
    
        val connectProperties = new Properties()
        connectProperties.put("user", "xxxxxx")
        connectProperties.put("password", "xxxxxxx")
        Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()
    
        //write back Oracle
        //Note: When writing the results back orale, be sure that the target table existing
        JdbcUtils.saveTable(df2Oracle, oracleDriverUrl, "MYTABLE2", connectProperties)
    
        sc.stop
      }
      def createSparkContext: SparkContext = {
        val conf = new SparkConf().setAppName("Operate")
          .setMaster("spark://xxxxxx:7077")
          .setJars(List("hdfs://xxxxx:8020//user//ojdbc14.jar"))
        //SparkConf parameters setting
        //conf.set("spark.sql.autoBroadcastJoinThreshold", "50M")
        /*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/
        //conf.set("spark.sql.codegen", "true")
        /*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/
        //conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
        /*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/
        //conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
        val sc = new SparkContext(conf)
        sc
      }
    
      //overwrite JdbcDialect fitting for Oracle
      val OracleDialect = new JdbcDialect {
        override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
        //getJDBCType is used when writing to a JDBC table
        override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
          case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
          case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
          case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
          case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
          case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
          case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
          case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
          case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
          case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
          case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
          case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
          //        case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
          case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
          case _ => None
        }
      }
    }

    此时的pom.xml:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>spark</groupId>
      <artifactId>test</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <scala.version>2.10.6</scala.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>commons-logging</groupId>
          <artifactId>commons-logging</artifactId>
          <version>1.1.1</version>
          <type>jar</type>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.1</version>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.9</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>1.5.2</version>
        </dependency>
          <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>2.6.0</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-mllib_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-hive_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-streaming_2.10</artifactId>
              <version>1.5.2</version>
          </dependency>
        <dependency>
          <groupId>com.databricks</groupId>
          <artifactId>spark-csv_2.10</artifactId>
          <version>1.5.0</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.10.6</version>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <configuration>
              <downloadSources>true</downloadSources>
              <buildcommands>
                <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
              </buildcommands>
              <additionalProjectnatures>
                <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
              </additionalProjectnatures>
              <classpathContainers>
                <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
              </classpathContainers>
            </configuration>
          </plugin>
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    </project>
    View Code
    
    
  • 相关阅读:
    高级(线性)素数筛
    Dijkstra(迪杰斯特拉)算法
    简单素数筛
    【解题报告】 POJ1958 奇怪的汉诺塔(Strange Tower of Hanoi)
    4 jQuery Chatting Plugins | jQuery UI Chatbox Plugin Examples Like Facebook, Gmail
    Web User Control Collection data is not storing
    How to turn on IE9 Compatibility View programmatically in Javascript
    从Javascrip 脚本中执行.exe 文件
    HtmlEditorExtender Ajax
    GRIDVIEW模板中查找控件的方式JAVASCRIPT
  • 原文地址:https://www.cnblogs.com/ksWorld/p/7193177.html
Copyright © 2011-2022 走看看