zoukankan      html  css  js  c++  java
  • sparkR 跑通的函数

    spark1.4.0的sparkR的思路:用spark从大数据集中抽取小数据(sparkR的DataFrame),然后到R里分析(DataFrame)。

    这两个DataFrame是不同的,前者是分布式的,集群上的DF,R里的那些包都不能用;后者是单机版的DF,包里的函数都能用。

    sparkR的开发计划,个人觉得是将目前包里的函数,迁移到sparkR的DataFrame里,这样就打开一片天地。

    > a<- sql(hiveContext, "SELECT count(*) FROM anjuke_scores where restaurant>=10");



    > a<- sql(hiveContext, "SELECT * FROM anjuke_scores limit 5")
    > a
    DataFrame[city:string, housingname:string, ori_traffic_score:int, ori_traffic_score_normal:double, metro_station:double, metro_station_normal:double,...
    > first(a)  #显示Formal Data Frame的第一行



    > head(a) ; #列出a的前6行

    > columns(a) # 列出全部的列

    [1] "city" "housingname" "ori_traffic_score" "ori_traffic_score_normal"

    [5] "metro_station" "metro_station_normal" "bus_station" "bus_station_normal" ...

    > showDF(a)
    > b<-filter(a, a$ori_comfort>8); # 行筛选, ori_comfort_normal:double



    > print(a);    #打印列名及类型  
    DataFrame[city:string, housingname:string, ori_traffic_score:int, ......


    > printSchema(a); # 打印列名的树形框架概要 root |-- city: string (nullable = true) |-- housingname: string (nullable = true) |-- ori_traffic_score: integer (nullable = true) |-- ori_traffic_score_normal: double (nullable = true) |-- metro_station: double (nullable = true)
    > take(a,10) ; # 提取Formal class DataFrame的前面num行,成为R中普通的 data frame , take(x, num)

    city housingname ori_traffic_score ori_traffic_score_normal metro_station metro_station_normal

    1 x9a xddrwxb8 NA 0 NA 0

    2 x9a xe4xf04u03a221~ NA 0 NA 0

    3 x9a xf6xe3wxb8 NA 0 NA 0

    4 x9a x8e=xb0wxb8 NA 0 NA 0

    5 x9a x9axe4xf04xcexe4xf0~ NA 0 NA 0

    6 x9a q4xfdE NA 0 NA 0

    7 x9a xe4xf04xce NA 0 NA 0

    8 x9a )xfdVT NA 0 NA 0

    9 x9a q177V NA 0 NA 0

    10 x9a xe4xf04xceWxb8 NA 0 NA 0

    > b<-take(a,10) 
    > dim(b)
    [1] 10 41



    > aa <- withColumn(a, "ori_comfort_aa", a$ori_comfort * 5)   #用现有的列生成新的列, 新增一列,ori_comfort_aa,结果还是Formal data frame结构
    > printSchema(aa)
    root
     |-- city: string (nullable = true)
    .........
     |-- comfort_normal: double (nullable = true)
     |-- ori_comfort_aa: double (nullable = true)


    > aa <- mutate(a, newCol1 = a$commerce_normal * 5, newCol2 = a$bank_normal * 2) ; #与withColumn类似

    > printSchema(aa)

    root

    |-- city: string (nullable = true)

    。。。。。。。。。。。。。。。。。。

    |-- comfort_normal: double (nullable = true)

    |-- newCol1: double (nullable = true)

    |-- newCol2: double (nullable = true)

    a1<-arrange(a,asc(a$level_tow)); # 按列排序, asc升序,desc降序

    a1<-orderBy(a,asc(a$level_tow)); # 按列排序

    count(a) ; # 统计 Formal Data Frame有多少行数据

    > dtypes(a);  #以list的形式列出Formal Data Frame的全部列名及类型
    [[1]]
    [1] "city"   "string"
    
    [[2]]
    [1] "housingname" "string"
    
    
    
    > a<-withColumnRenamed(a,"comfort_normal","AA");  # 更改列名  
    > printSchema(a)
    root
     |-- city: string (nullable = true)
     |-- housingname: string (nullable = true)
    ..........
     |-- AA: double (nullable = true)



    创建sparkR的数据框的函数

    createDataFrame



    > df<-createDataFrame(sqlContext,a.df);  # a.df是R中的数据框, df是sparkR的数据框,注意:使用sparkR的数据库,需要sqlContext


    > str(a.df)

    'data.frame': 5 obs. of 41 variables:

    > str(df)

    Formal class 'DataFrame' [package "SparkR"] with 2 slots

    ..@ env:<environment: 0x4fce350> 

    ..@ sdf:Class 'jobj' <environment: 0x4fc70b0> 

    > destDF <- select(SFO_DF, "dest", "cancelled"); #选择列

    > showDF(destDF); #显示sparkR的DF

    +----+---------+

    |dest|cancelled|

    +----+---------+

    | SFO| 0|

    ................

    > registerTempTable(SFO_DF, "flightsTable"); #要对sparkDF使用SQL语句,首先需要将DF注册成一个table



    > wa <- sql(sqlContext, "SELECT dest, cancelled FROM flightsTable"); #在sqlContext下使用SQL语句

    > showDF(wa); #查询的结果还是sparkDF

    +----+---------+

    |dest|cancelled|

    +----+---------+

    | SFO| 0|

    ................

    > local_df <- collect(wa); #将sparkDF转换成R中的DF

    > str(local_df)

    'data.frame': 2818 obs. of 2 variables:

    $ dest : chr "SFO" "SFO" "SFO" "SFO" ...

    $ cancelled: int 0 0 0 0 0 0 0 0 0 0 ...

    > wa<-flights_df[1:1000,]; #wa是R中的DF

    > flightsDF<-createDataFrame(sqlContext,wa) ; #flightsDF是sparkR的DF

    > library(magrittr); #管道函数的包对sparkRDF适用

    > groupBy(flightsDF, flightsDF$date) %>%

    + summarize(avg(flightsDF$dep_delay), avg(flightsDF$arr_delay)) -> dailyDelayDF; #注意,语法和dplyr中的有所不同,结果还是sparkRDF


    > str(dailyDelayDF)

    Formal class 'DataFrame' [package "SparkR"] with 2 slots

    ..@ env:<environment: 0x4cd3118> 

    ..@ sdf:Class 'jobj' <environment: 0x4cd6968> 

    > showDF(dailyDelayDF)

    +----------+--------------------+--------------------+

    | date| AVG(dep_delay)| AVG(arr_delay)|

    +----------+--------------------+--------------------+

    |2011-01-01| 5.2| 5.8|

    |2011-01-02| 1.8333333333333333| -2.0|

    ................

    在39机器上跑的

    collect将sparkDF转化成DF

    Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.

    collect(x, stringsAsFactors = FALSE),x:A SparkSQL DataFrame

    > dist_df<- sql(hiveContext, "SELECT * FROM anjuke_scores where restaurant<=1");

    > local_df <- dist_df %>% 

    groupBy(dist_df$city) %>% 

    summarize(count = n(dist_df$housingname)) %>% 

    collect

    > local_df

    city count

    1 x9a 5

    2 8xde 7

    3 xf0xde 2

    ..........

    ..........

    take也可将sparkDF转化成DF

    Take the first NUM rows of a DataFrame and return a the results as a data.frame

    take(x, num)

    > local_df <- dist_df %>% 

    groupBy(dist_df$city) %>% 

    summarize(count = n(dist_df$housingname))

    > a<-take(local_df,100)

    [Stage 16:=========================================> (154 + 1) / 199] > View(a)

    > a

    city count

    1 x9a 5

    2 8xde 7

    3 xf0xde 2

    ..........

    ..........

    不通的函数:

    > describe(a)
    Error in x[present, drop = FALSE] : 
      object of type 'S4' is not subsettable
    > jfkDF <- filter(flightsDF, flightsDF$dest == "DFW")
    Error in filter(flightsDF, flightsDF$dest == "DFW") : 
      no method for coercing this S4 class to a vector
  • 相关阅读:
    [模板]杜教筛
    [NOIP2014]解方程
    [NOIP2016] 组合数问题
    [HAOI2011] Problem b
    Rmq Problem mex
    [模板]Link-Cut-Tree
    [SDOI2013]森林
    单调队列优化多重背包
    [USACO17JAN]Promotion Counting
    [模板] 点分治
  • 原文地址:https://www.cnblogs.com/awishfullyway/p/6485225.html
Copyright © 2011-2022 走看看