zoukankan      html  css  js  c++  java
  • 转】Spark DataFrame小试牛刀

      原博文出自于:  https://segmentfault.com/a/1190000002614456        感谢!

    三月中旬,Spark发布了最新的1.3.0版本,其中最重要的变化,便是DataFrame这个API的推出。DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍。这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心。DataFrame像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性的统计工程师。

    以一个常见的场景 -- 日志解析为例,有时我们需要用到一些额外的结构化数据(比如做IP和地址的映射),通常这样的数据会存在MySQL,而访问的方式有两种:一是每个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD的API转化为RDD格式,然后编写繁复的函数去实现检索,显然要写更多的代码。而现在,Spark提供了一种新的选择,一行代码就能实现从MySQL到DataFrame的转化,并且支持SQL查询。

    实例

    首先我们在本地放置了一个JSON文件,文件内容如下:

     {"name":"Michael"}
     {"name":"Andy", "age":30}
     {"name":"Justin", "age":19}
    

    然后我们进入spark-shell,控制台的提示说明Spark为我们创建了一个叫sqlContext的上下文,注意,它是DataFrame的起点。
    接下来我们希望把本地的JSON文件转化为DataFrame

    scala> val df = sqlContext.jsonFile("/path/to/your/jsonfile")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    

    从控制台的提示可以得知,我们成功创建了一个DataFrame的对象,包含agename两个字段。
    DataFrame自带的玩法就多了:

    // 输出表结构
    df.printSchema()
    
    // 选择所有年龄大于21岁的人,只保留name字段
    df.filter(df("age") > 21).select("name").show()
    
    // 选择name,并把age字段自增
    df.select("name", df("age") + 1).show()
    
    // 按年龄分组计数
    df.groupBy("age").count().show()
    
    // 左联表(注意是3个等号!)
    df.join(df2, df("name") === df2("name"), "left").show()
    

    此外,我们也可以把DataFrame对象转化为一个虚拟的表,然后用SQL语句查询,比如下面的命令就等同于df.groupBy("age").count().show()

    df.registerTempTable("people")
    sqlContext.sql("select age, count(*) from people group by age").show()
    

    当然,Python有同样丰富的API(由于最终都是转化为JVM bytecode执行,Python和Scala的效率是一样的),而且Python还提供了类Pandas的操作语法。关于Python的API,可以参考Spark新年福音:一个用于大规模数据科学的API——DataFrame

    MySQL

    除了JSON之外,DataFrame现在已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是通过jdbc实现的。

    对于不同的关系数据库,必须在SPARK_CLASSPATH变量中加入对应connector的jar包,比如希望连接MySQL的话应该这么启动spark-shell

    SPARK_CLASSPATH=mysql-connector-java-x.x.x-bin.jar spark-shell
    

    下面要将一个MySQL表转化为DataFrame对象:

    val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table"))
    

    然后十八般武艺又可以派上用场了。

    Hive

    Spark提供了一个HiveContext的上下文,其实是SQLContext的一个子类,但从作用上来说,sqlContext也支持Hive数据源。只要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml文件挪到$SPARK_HOME/conf路径下,我们就可以直接用Spark查询包含已有元数据的Hive表了:

    sqlContext.sql("select count(*) from hive_people").show()
    

    结语

    Spark的目标在于成为一个跨环境、跨语言、跨工具的大数据处理和分析平台。DataFrame的推出很好诠释了这一目标,从初步的使用来看确实很容易上手。随着性能和稳定性的持续优化,我相信某一天所有玩数据的人,都可以使用Spark作为惟一的平台入口。

    来自:建造者说

  • 相关阅读:
    win7网络共享原来如此简单,WiFi共享精灵开启半天都弱爆了!
    JQUERY UI Datepicker Demo
    Official online document, install svn server in centOS
    JAVE not work in linux
    AMR 转mp3 失败
    XD, XR, DR 股票
    Linux 下MySql 重置密码
    Difinition Of Done
    Apache, Tomcat, JK Configuration Example
    Linux 安装tomcat
  • 原文地址:https://www.cnblogs.com/zlslch/p/6039506.html
Copyright © 2011-2022 走看看