zoukankan      html  css  js  c++  java
  • scala之 spark连接SQL和HIVE/IDEA操作HDFS

    一、连接SQL

    方法一、

    package com.njbdqn.linkSql
    
    import java.util.Properties
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql._
    
    object LinkSql {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("apptest").master("local[2]").getOrCreate()
        // 1.properties
        val prop = new Properties()
        prop.setProperty("driver","com.mysql.jdbc.Driver")
        prop.setProperty("user","root")
        prop.setProperty("password","root")
        // 2.jdbcDF show
        val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)
        jdbcDF.show(false)
        // 3.添加一行
        import spark.implicits._
        val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq((90, "抖抖抖", "男", 23, "sdf", "sdfg@dfg"),(8, "抖33", "男", 23, "s444f", "sdfg@dfg"))))
          .toDF("sid","sname","sgender","sage","saddress","semail")
      //  df.show(false)
        df.write.mode("append").jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)
    
      }
    }

    方法二、

    package com.njbdqn
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object KMeansTest {
      def readMySQL(spark:SparkSession):DataFrame ={
        val map:Map[String,String]=Map[String,String](
          elems="url"->"jdbc:mysql://192.168.56.111:3306/myshops",
          "driver" -> "com.mysql.jdbc.Driver",
          "user" ->"root",
          "password"->"root",
          "dbtable"->"customs"
        )
        spark.read.format("jdbc").options(map).load()
      }
      def main(args: Array[String]): Unit = {
        val spark=SparkSession.builder().appName("db").master("local[*]").getOrCreate()
        readMySQL(spark).select("cust_id","company","province_id","city_id","district_id","membership_level","create_at","last_login_time","idno","biz_point","sex","marital_status","education_id","login_count","vocation","post")
          .show(20)
        spark.stop()
    
      }
    }

     方法三、读取Resource上写的.properties配置:

    https://www.cnblogs.com/sabertobih/p/13874061.html

    二、连接HIVE

    (一)8 9月写的,没有理解,写的不好

    1.添加resources

     2.代码

    package com.njbdqn.linkSql
    
    import org.apache.spark.sql.SparkSession
    
    object LinkHive {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("apptest").master("local[2]")
          .enableHiveSupport()
          .getOrCreate()
        spark
           // .sql("show databases")
          .sql("select * from storetest.testhive")
          .show(false)
      }
    }

     注意!如果XML配置中配置的是集群,  val df = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv") 就失败了,因为

    >>> spark可以读取本地数据文件,但是需要在所有的节点都有这个数据文件(亲测,在有三个节点的集群中,只在master中有这个数据文件时执行textFile方法一直报找不到文件,

    在另外两个work中复制这个文件之后,就可以读取文件了)

    >>> 解决:删除配置(本地)/上传到hdfs(集群)


     (二)12月25日写的

    pom文件:

        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>2.3.4</version>
        </dependency>

    代码中增加配置:hive.metastore.uris 

    开启metastore元数据共享,

    修改方案详见:https://www.pianshen.com/article/8993307375/

    为什么这样修改?原理见:https://www.cnblogs.com/sabertobih/p/13772933.html

    import org.apache.spark.sql.SparkSession
    
    object EventTrans {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local[*]")
          .config("hive.metastore.uris","thrift://192.168.56.115:9083") # 配置metastore server的访问地址,该server必须开启服务
          .appName("test")
          .enableHiveSupport().getOrCreate()
        spark.sql("select * from dm_events.dm_final limit 3")
          .show(false)
    
        spark.close()
    
      }
    }

    1)192.168.56.115 需要开启metastore服务

    hive --service metastore

    如果不启动服务,在启动Spark thriftServer服务的时候会报如下错误:

    org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

    2)192.168.56.115 需要配置直连mysql

    验证:

    三、操作HDFS 之 删除

        val spark = SparkSession.builder().master("local[*]").appName("app").getOrCreate();
        /**
         *  删除checkpoint留下的过程数据
         */
        val path = new Path(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"); //声明要操作(删除)的hdfs 文件路径
        val hadoopConf = spark.sparkContext.hadoopConfiguration
        val hdfs = org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf)
        if(hdfs.exists(path)) {
          //需要递归删除设置true,不需要则设置false
          hdfs.delete(path, true) //这里因为是过程数据,可以递归删除
        }

    出现的问题:

    Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://h1:9000/out, expected: file:///
    at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:381)
    at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:55)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:393)
    at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:452)
    at mapreduce.WordCountApp.main(WordCountApp.java:36)

    解决方法:

      val hdfs = org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf) 

  • 相关阅读:
    大一C语言结课设计之《学生信息管理系统》
    HDU1003 Max Sum(求最大字段和)
    ORACLE AUTOMATIC STORAGE MANAGEMENT翻译-第二章ASM Instance(4)完
    System.setProperty()
    分布式存储系统sheepdog
    c语言全局变量和局部变量问题汇总
    excel VLOOKUP函数的用法
    现有一些开源ESB总线的比較
    POJ 3624 Charm Bracelet
    H.264远程视频监控系统
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13772985.html
Copyright © 2011-2022 走看看