zoukankan      html  css  js  c++  java
  • 知识点-Spark小节

    Spark处理字符串日期的max和min的方式
    Spark处理数据存储到Hive的方式
    Spark处理新增列的方式map和udf、functions
    Spark处理行转列pivot的使用
    Python 3.5.3
    Spark1.6.2

    欢迎访问个人主页博客

    Spark处理字符串日期的max和min的方式

    一般是字符串类型的日期在使用Spark的agg求max时,是不正确的,API显示只支持数值型的max、min
    hive的SQL查询引擎是支持字符串日期的max和min的

    字符串日期转为时间戳再聚合

    unix_timestamp

    public static Column unix_timestamp(Column s)
    Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.
    Parameters:
    s - (undocumented)
    Returns:
    (undocumented)
    Since:
    1.5.0
    from pyspark.sql import functions as F
    
    df.withColumn('startuptime_stamp', F.unix_timestamp('startuptime'))
    使用HiveSQL
    select device_id, max(startuptime) as max_startuptime, min(startuptime) as min_startuptime from app_table group by device_id

    Spark处理数据存储到Hive的方式

    通常Spark任务处理后的结果数据会存储到Hive表中,可以先保存至HDFS目录再load、最方便还是直接使用临时表和HiveContext插入数据

    saveAsTextFile & load data

    repartition根据实际文件大小进行调整,数据比较小时,保存成一个文件

    df.map(lambda r: func).repartition(1).saveAsTextFile(data_dir)

    先删除分区,如果已经存在的话
    再覆盖原来的数据【方便重新重复跑或修复数据】
    此处使用shell,也可使用HiveContext的sql

    alter table app_table drop if exists partition(datestr='$day_01');
    load data inpath 'hdfs://xx/out/$day_01' overwrite into table app_table partition(datestr='$day_01');
    hivectx.sql & insert
    app_table1_df.registerTempTable("app_table1_tmp")
    app_table2_df.registerTempTable("app_table2_tmp")
    hivectx.sql("set spark.sql.shuffle.partitions=1")
    hivectx.sql("alter table app_table drop if exists partition(datestr='%s')" % daystr)
    hivectx.sql("insert overwrite table app_table partition(datestr='%s') select * from app_table1_tmp" % daystr)
    hivectx.sql("insert into app_table partition(datestr='%s') select * from app_table2_tmp" % daystr)

    Spark处理新增列的方式map和udf、functions

    Spark在处理数据转换时,通常需要使用map、flatmap等操作,其中使用map会产生新的列或修改某列字段的值
    Spark同样支持自定义函数UDF以及提供了类似Hive内置函数的各种各样的处理函数

    map

    需要定义函数和StructType
    忽略数值判断细节和精度等

    from pyspark.sql.types import *
    
    def a_func(_):
        return _['id'], _['cnt1'], _['cnt2'], _['cnt1'] / (_['cnt1'] + _['cnt1'])
    
    a_schema = StructType([
        StructField('id', StringType(), True),
        StructField('cnt1', IntegerType(), True),
        StructField('cnt2', IntegerType(), True),
        StructField('cnt1_rate', IntegerType(), True)
    ])
    
    a_new_df = sqlctx.createDataFrame(df.select('id', 'cnt1', 'cnt2').map(a_func), a_schema)
    udf

    需要定义函数和UDF
    忽略数值判断细节和精度等

    def a_func(cnt1, cnt2):
        return cnt1 / (cnt1 + cnt2)
    
    a_udf = F.udf(a_func, IntegerType())
    
    a_new_df = df.withColumn('cnt1_rate', a_udf(df['cnt1'], df['cnt2'])
    functions

    处理类似日期字符串的格式转换、等等等
    https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

    Spark处理行转列pivot的使用

    在使用SQL查询数据时,很多情况下需要将行转为列,以有利于数据的展示和不同维度需求的利用
    一般可采用子查询case when、连续join、字段补全union的形式
    Spark的DataFrame中可以通过GroupedData的pivot函数来实现

    df.groupBy(['course_name']).pivot('daystr').sum('score')
    
    df.groupBy(['course_name']).pivot('daystr').count()

    转换前

    daystr course_name score
    2017-11-15 yuwen 1
    2017-11-15 yuwen 1
    2017-11-15 shuxue 1
    2017-11-15 yingyu 2
    2017-11-16 yuwen 1
    2017-11-16 shuxue 1
    2017-11-16 yingyu 2
    

    转换后

    course_name 2017-11-15 2017-11-16
    yuwen 2 1
    shuxue 1 1
    yingyu 2 2
    
    course_name 2017-11-15 2017-11-16
    yuwen 2 1
    shuxue 1 1
    yingyu 1 1

    原文地址:https://blog.icocoro.me/2017/11/16/1711-zhishidian-spark%E5%B0%8F%E8%8A%8201/index.html
  • 相关阅读:
    java8大排序
    如何删除oracle 的用户及其数据
    JavaScript开发者常忽略或误用的七个基础知识点
    Vim学习指南
    5个开发人员不应该错过的最好跨平台PHP编辑器
    OpenGL 简介
    web 页面内容优化管理与性能技巧
    创建高性能移动 web 站点
    近期十大优秀jQuery插件推荐
    30本世界名著浓缩成的经典话语
  • 原文地址:https://www.cnblogs.com/once/p/7966105.html
Copyright © 2011-2022 走看看