zoukankan      html  css  js  c++  java
  • 【Spark篇】---SparkSQL on Hive的配置和使用

    一、前述

    Spark on Hive Hive只作为储存角色Spark负责sql解析优化,执行。

    二、具体配置

    1、在Spark客户端配置Hive On Spark

     

                在Spark客户端安装包下spark-1.6.0/conf中创建文件hive-site.xml

                配置hive的metastore路径

    <configuration>
       <property>
            <name>hive.metastore.uris</name>
            <value>thrift://node1:9083</value>
       </property>
    </configuration>

     

    2、启动Hive的metastore服务

           hive --service metastore

     

    3、启动zookeeper集群,启动HDFS集群。

    4、启动SparkShell 读取Hive中的表总数,对比hive中查询同一表查询总数测试时间。

    ./spark-shell 
    --master spark://node1:7077,node2:7077 
     --executor-cores 1 
    --executor-memory 1g 
    --total-executor-cores 1
    import org.apache.spark.sql.hive.HiveContext
    val hc = new HiveContext(sc)
    hc.sql("show databases").show
    hc.sql("user default").show
    hc.sql("select count(*) from jizhan").show

    可以发现性能明显提升!!!

    注意:

    如果使用Spark on Hive  查询数据时,出现错误:

    找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径:

     export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

     三、读取Hive中的数据加载成DataFrame

       1、HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。

       2、由于本地没有Hive环境,要提交到集群运行,提交命令:

    /spark-submit 
    --master spark://node1:7077,node2:7077 
    --executor-cores 1 
    --executor-memory 2G 
    --total-executor-cores 1
    --class com.bjsxt.sparksql.dataframe.CreateDFFromHive 
    /root/test/HiveTest.jar

     java代码:

    SparkConf conf = new SparkConf();
    conf.setAppName("hive");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //HiveContext是SQLContext的子类。
    HiveContext hiveContext = new HiveContext(sc);
    hiveContext.sql("USE spark");
    hiveContext.sql("DROP TABLE IF EXISTS student_infos");
    //在hive中创建student_infos表
    hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '	' ");
    hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");
    
    hiveContext.sql("DROP TABLE IF EXISTS student_scores"); 
    hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '	'");  
    hiveContext.sql("LOAD DATA "
    + "LOCAL INPATH '/root/test/student_scores'"
    + "INTO TABLE student_scores");
    /**
     * 查询表生成DataFrame
     */
    DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "
    + "FROM student_infos si "
    + "JOIN student_scores ss "
    + "ON si.name=ss.name "
    + "WHERE ss.score>=80");
    
    hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
    
    goodStudentsDF.registerTempTable("goodstudent");
    DataFrame result = hiveContext.sql("select * from goodstudent");
    result.show();
    
    /**
     * 将结果保存到hive表 good_student_infos
     */
    goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
    
    Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();  
    for(Row goodStudentRow : goodStudentRows) {
        System.out.println(goodStudentRow);  
    }
    sc.stop();

    scala代码:

    val conf = new SparkConf()
     conf.setAppName("HiveSource")
     val sc = new SparkContext(conf)
     /**
      * HiveContext是SQLContext的子类。
      */
     val hiveContext = new HiveContext(sc)
     hiveContext.sql("use spark")
     hiveContext.sql("drop table if exists student_infos")
     hiveContext.sql("create table if not exists student_infos (name string,age int) row format  delimited fields terminated by '	'")
     hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")
     
     hiveContext.sql("drop table if exists student_scores")
     hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '	'")
     hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")
     
     val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
     hiveContext.sql("drop table if exists good_student_infos")
     /**
      * 将结果写入到hive表中
      */
     df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
     
     sc.stop()
    

     结果:

    可以看到分组内有序,组间并不是有序的!!!!

  • 相关阅读:
    流量染色与gRPC服务托管 微服务协作开发、灰度发布之流量染色 灰度发布与流量染色
    http://www.cnblogs.com/sealedbook/p/6194047.html
    celery 原理
    修改织梦默认栏目页、文章页URL命名规则
    Dede首页幻灯版显示Bug修正
    DEDECMS5.7 首页和栏目页调用文章按权重排序
    dede文章摘要字数的设置方法
    DEDECMS登录后台慢的完美解决方案
    DedeCMS去掉友情链接中“织梦链投放”“织梦链”的方法
    删除dedecms5.7后台登陆验证码
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8431750.html
Copyright © 2011-2022 走看看