zoukankan      html  css  js  c++  java
  • spark SQL学习(spark连接hive)

    spark 读取hive中的数据

    scala> import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.hive.HiveContext
    scala> val hiveContext = new HiveContext(sc)
    //hive中的feigu数据库中表stud_info
    scala> val stud_infoRDD = hiveContext.sql("select * from feigu.stud_info").rdd
    scala> stud_infoRDD.take(5).foreach(line => println("code:"+line(0)+";name:"+line(1)))
    code:stud_code;name:stud_name
    code:2015101000;name:王进
    code:2015101001;name:刘海
    code:2015101002;name:张飞
    code:2015101003;name:刘婷
    
    
    

    spark载入数据到hive

    两个文件

    hadoop@master:~/wujiadong$ cat spark_stud_info.txt
    wujiadong,26
    ji,24
    sun,27
    xu,25
    
    hadoop@master:~/wujiadong$ cat spark_stud_score.txt
    wujiadong,90
    ji,100
    sun,99
    xu,99
    
    

    scala代码

    scala> import org.apache.spark.sql.hive.HiveContext
    scala> val hiveContext = new HiveContext(sc)
    scala> hiveContext.sql("drop table if exists wujiadong.spark_stud_info")
    scala> hiveContext.sql("create table if not exists wujiadong.spark_stud_info(name string,age int) row format delimited fields terminated by ','")
    scala> hiveContext.sql("load data local inpath '/home/hadoop/wujiadong/spark_stud_info.txt' into table wujiadong.spark_stud_info");
    
    scala> hiveContext.sql("drop table if exists wujiadong.spark_stud_score")
    scala> hiveContext.sql("create table if not exists wujiadong.spark_stud_score(name string,score int) row format delimited fields terminated by ','")
    scala> hiveContext.sql("load data local inpath '/home/hadoop/wujiadong/spark_stud_score.txt' into table wujiadong.spark_stud_score");
    
    
    然后到hive中查询是否导入成功
    hive> select * from spark_stud_info;
    OK
    wujiadong	26
    ji	24
    sun	27
    xu	25
    Time taken: 0.178 seconds, Fetched: 4 row(s)
    hive> select * from spark_stud_score;
    OK
    wujiadong	90
    ji	100
    sun	99
    xu	99
    Time taken: 0.212 seconds, Fetched: 4 row(s)
    
    //将两张表进行连接查询大于99分的
    scala> val df = hiveContext.sql("select sss.name,sss.score from wujiadong.spark_stud_info ssi join wujiadong.spark_stud_score sss on ssi.name=sss.name where sss.score > 99")
    scala> df.show()
    17/03/06 22:30:37 INFO FileInputFormat: Total input paths to process : 1
    17/03/06 22:30:38 INFO FileInputFormat: Total input paths to process : 1
    +----+-----+
    |name|score|
    +----+-----+
    |  ji|  100|
    +----+-----+
    
    //将df中数据保存到表result_stu表中
    scala> hiveContext.sql("drop table if exists wujiadong.result_stud")
    scala> df.saveAsTable("wujiadong.result_stu")
    //然后针对表result_stu直接创建dataframe
    
    
    //Hive中查看
    hive> select * from result_stu;
    OK
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    ji	100
    Time taken: 0.252 seconds, Fetched: 1 row(s)
    

    参考资料
    http://dblab.xmu.edu.cn/blog/1086-2/
    参考资料
    http://blog.csdn.net/ggz631047367/article/details/50445877

  • 相关阅读:
    vue 定义全局函数和变量
    大学感受
    NOIP2018 游记
    NOI2018 游记
    THUSC 2018 游记
    APIO2018 游记
    SXOI2018游记
    poorpool 的 考场 NOI Linux 配置
    关于 poorpool
    NOIP2017 游记
  • 原文地址:https://www.cnblogs.com/wujiadong2014/p/6516594.html
Copyright © 2011-2022 走看看