zoukankan      html  css  js  c++  java
  • spark sql 中的结构化数据

    1. 连接mysql

     首先需要把mysql-connector-java-5.1.39.jar 拷贝到 spark 的jars目录里面;


    scala> import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.SQLContext

    scala> val sqlContext=new SQLContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3a649f9a

    scala>  sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/metastore",
         |  "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "DBS", "user" -> "root", "password" -> "root")).load().show
    +-----+--------------------+--------------------+-------+----------+----------+
    |DB_ID|                DESC|     DB_LOCATION_URI|   NAME|OWNER_NAME|OWNER_TYPE|
    +-----+--------------------+--------------------+-------+----------+----------+
    |    1|Default Hive data...|hdfs://localhost:...|default|    public|      ROLE|
    |    2|                null|hdfs://localhost:...|    aaa|      root|      USER|
    |    6|                null|hdfs://localhost:...| userdb|      root|      USER|
    +-----+--------------------+--------------------+-------+----------+----------+

    -----------------------------------------------------------------------------------------------------------------

    scala> import org.apache.spark.sql.{SQLContext,SparkSession}

    import org.apache.spark.sql.{SQLContext, SparkSession}

    scala> val url="jdbc:mysql://localhost:3306/test?user=root&password=root&useUnicode=true&characterEncoding=UTF-8"
    url: String = jdbc:mysql://localhost:3306/test?user=root&password=root&useUnicode=true&characterEncoding=UTF-8

    scala> val con = new SQLContext(sc); 

    warning: there was one deprecation warning; re-run with -deprecation for details

    con: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3a973b5e


    scala> con.read.format("jdbc").options(Map("url"->url,"dbtable"->"role")).load.show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name|             dateid|               addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    |     1|null|2017-11-16 14:49:11|henan luohe linying|  1|   10|
    |    40|null|2017-11-13 14:50:25| guangdong shenzhen|  1|   20|
    |   110|null|2017-11-14 14:50:47|            beijing|  1|   20|
    |   200|null|2017-11-14 14:49:47|   shandong qingdao|  0|    8|
    |   400|null|2017-11-15 14:49:56|        anhui hefei|  0|    4|
    |   600|null|2017-11-15 14:50:05|     hunan changsha|  0|   91|
    |   650|null|2017-11-01 17:24:34|               null|  1|   29|
    |   651|wang|2018-06-06 16:16:55|           shenzhen|  1|   60|
    +------+----+-------------------+-------------------+---+-----+

    scala> con.read.format("jdbc").option("url",url).option("dbtable","role").load.show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name|             dateid|               addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    |     1|null|2017-11-16 14:49:11|henan luohe linying|  1|   10|
    |    40|null|2017-11-13 14:50:25| guangdong shenzhen|  1|   20|
    |   110|null|2017-11-14 14:50:47|            beijing|  1|   20|
    |   200|null|2017-11-14 14:49:47|   shandong qingdao|  0|    8|
    |   400|null|2017-11-15 14:49:56|        anhui hefei|  0|    4|
    |   600|null|2017-11-15 14:50:05|     hunan changsha|  0|   91|
    |   650|null|2017-11-01 17:24:34|               null|  1|   29|
    |   651|wang|2018-06-06 16:16:55|           shenzhen|  1|   60|
    +------+----+-------------------+-------------------+---+-----+

    scala> val session=SparkSession.builder.getOrCreate()
    session: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@71e393a2

    scala> session.read.format("jdbc").options(Map("url"->url,"dbtable"->"role")).load.show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name|             dateid|               addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    |     1|null|2017-11-16 14:49:11|henan luohe linying|  1|   10|
    |    40|null|2017-11-13 14:50:25| guangdong shenzhen|  1|   20|
    |   110|null|2017-11-14 14:50:47|            beijing|  1|   20|
    |   200|null|2017-11-14 14:49:47|   shandong qingdao|  0|    8|
    |   400|null|2017-11-15 14:49:56|        anhui hefei|  0|    4|
    |   600|null|2017-11-15 14:50:05|     hunan changsha|  0|   91|
    |   650|null|2017-11-01 17:24:34|               null|  1|   29|
    |   651|wang|2018-06-06 16:16:55|           shenzhen|  1|   60|
    +------+----+-------------------+-------------------+---+-----+


    scala> session.read.format("jdbc").option("url",url).option("dbtable","role").load.show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name|             dateid|               addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    |     1|null|2017-11-16 14:49:11|henan luohe linying|  1|   10|
    |    40|null|2017-11-13 14:50:25| guangdong shenzhen|  1|   20|
    |   110|null|2017-11-14 14:50:47|            beijing|  1|   20|
    |   200|null|2017-11-14 14:49:47|   shandong qingdao|  0|    8|
    |   400|null|2017-11-15 14:49:56|        anhui hefei|  0|    4|
    |   600|null|2017-11-15 14:50:05|     hunan changsha|  0|   91|
    |   650|null|2017-11-01 17:24:34|               null|  1|   29|
    |   651|wang|2018-06-06 16:16:55|           shenzhen|  1|   60|
    +------+----+-------------------+-------------------+---+-----+

    scala> import java.util.Properties

    import java.util.Properties

    scala> val pro=new Properties()

    pro: java.util.Properties = {}

    scala> session.read.jdbc(url,"role",pro).show
    +------+----+-------------------+-------------------+---+-----+
    |roleid|name|             dateid|               addr|sex|level|
    +------+----+-------------------+-------------------+---+-----+
    |     1|null|2017-11-16 14:49:11|henan luohe linying|  1|   10|
    |    40|null|2017-11-13 14:50:25| guangdong shenzhen|  1|   20|
    |   110|null|2017-11-14 14:50:47|            beijing|  1|   20|
    |   200|null|2017-11-14 14:49:47|   shandong qingdao|  0|    8|
    |   400|null|2017-11-15 14:49:56|        anhui hefei|  0|    4|
    |   600|null|2017-11-15 14:50:05|     hunan changsha|  0|   91|
    |   650|null|2017-11-01 17:24:34|               null|  1|   29|
    |   651|wang|2018-06-06 16:16:55|           shenzhen|  1|   60|
    +------+----+-------------------+-------------------+---+-----+

    2.连接hive,首先需要将hive的配置文件hive-site.xml拷贝到spark的conf目录下或者在conf目录下新建hive-site.xml,添加以下内容

    (由于从hive拷贝过来的文件报错,因此本人采用了新建文件的方式)

    <configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true</value>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>Driver class name for a JDBC metastore</description>
      </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>root</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>root</value>
    </property>
     <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
        <description>location of default database for the warehouse</description>
      </property>
    <property>
    <name>hive.exec.scratchdir</name>
    <value>/tmp/hive/tmp</value>
    </property>
    <property>
    <name>hive.querylog.location</name>
    <value>/tmp/hive/log</value>
    </property>
    </configuration>

    启动 spark-shell:

    HiveContext读取hive

    scala> import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.hive.HiveContext

    scala> val hivecon=new HiveContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    hivecon: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@1b96f15e

    scala> hivecon.sql("show databases").show
    +------------+
    |databaseName|
    +------------+
    |         aaa|
    |     default|
    |   sparkhive|
    |      userdb|
    +------------+

     --------------------------------------------

    --SparkSession读取hive

    scala> import org.apache.spark.sql.SparkSession

    import org.apache.spark.sql.SparkSession

    scala> val session=SparkSession.builder.getOrCreate()

    session: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@331d651b


    scala> session.sql("select sex,count(1) from gamedw.cust group by sex").show
    +---+--------+
    |sex|count(1)|
    +---+--------+
    |  1|       6|
    |  0|       3|
    +---+--------+

  • 相关阅读:
    算法-heapq模块优先队列
    用find命令删除某目录下及所有子目录中某类型的特定文件
    OpenStack虚拟机virtaulinterfance 网络设备在libvirt的代码梳理
    cinderclient命令行源码解析
    python 多个装饰器的调用顺序分析
    wsgi的environ变量
    Cirros镜像
    写入多个变量到配置文件【linux】
    SecureCRT 私钥登录ec2 报:A protocol error occurred.Too many authentication failures for ec2-user 解决
    S3上备份的json文件转存成parquet文件
  • 原文地址:https://www.cnblogs.com/playforever/p/7772879.html
Copyright © 2011-2022 走看看