zoukankan      html  css  js  c++  java
  • 【慕课网实战】七、以慕课网日志分析为例 进入大数据 Spark SQL 的世界

    用户:
        方便快速从不同的数据源(json、parquet、rdbms),经过混合处理(json join parquet),
        再将处理结果以特定的格式(json、parquet)写回到指定的系统(HDFS、S3)上去
     
    Spark SQL 1.2 ==> 外部数据源API
     
    外部数据源的目的
    1)开发人员:是否需要把代码合并到spark中????
        weibo
        --jars
     
    2)用户
        读:spark.read.format(format)  
            format
                build-in: json parquet jdbc  csv(2+)
                packages: 外部的 并不是spark内置   https://spark-packages.org/
        写:people.write.format("parquet").save("path")        
     
    处理parquet数据
     
    RuntimeException: file:/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file
     
      val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
        .doc("The default data source to use in input/output.")
        .stringConf
        .createWithDefault("parquet")
     
    #注意USING的用法
    CREATE TEMPORARY VIEW parquetTable
    USING org.apache.spark.sql.parquet
    OPTIONS (
      path "/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
    )
     
    SELECT * FROM parquetTable
     
    spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_1")
     
    org.apache.spark.sql.AnalysisException: Attribute name "count(1)" contains invalid character(s) among " ,;{}() =". Please use alias to rename it.;
     
    spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")
     
    在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200
     
    操作MySQL的数据:
    spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/hive").option("dbtable", "hive.TBLS").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
     
    java.sql.SQLException: No suitable driver
     
    import java.util.Properties
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")
    connectionProperties.put("driver", "com.mysql.jdbc.Driver")
     
    val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "hive.TBLS", connectionProperties)
     
    CREATE TEMPORARY VIEW jdbcTable
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url "jdbc:mysql://localhost:3306",
      dbtable "hive.TBLS",
      user 'root',
      password 'root',
      driver 'com.mysql.jdbc.Driver'
    )
     
    外部数据源综合案例
    create database spark;
    use spark;
     
    CREATE TABLE DEPT(
    DEPTNO int(2) PRIMARY KEY,
    DNAME VARCHAR(14) ,
    LOC VARCHAR(13) ) ;
     
    INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
    INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
    INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
    INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON'); 
  • 相关阅读:
    SkyWalking结合Logback获取全局唯一标识 trace-id 记录到日志中
    Mysql数据库优化技术
    MySQL中集合的差的运算方法
    深入理解Java ClassLoader及在 JavaAgent 中的应用
    自制吸锡带
    Ubuntu下双显示器设定
    ffmpeg 命令的使用
    ifeq ifneq ifdef ifndef
    字符对齐
    ruby on rails使用gmail的smtp发送邮件
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8493744.html
Copyright © 2011-2022 走看看