zoukankan      html  css  js  c++  java
  • pyspark写入hive(二) 使用 saveAsTable

    一、问题描述

    pyspark写入hive分区表中,使用了建临时表的方式。一般情况下是没有问题的,但是当涉及到class pyspark.sql.types.FloatType,就会出现bug。
    比如当统计列表中每个单词出现的概率,同时保留最多四位小数

    from Collections import Counter
    mylist = ["a","b","c","a"]
    k_p_dict = dict()
    d = Counter(mylist)
    for item in d:
      p = round(d[item] / len_keyword,4)
      k_p_dict[item] = p
    

    但是如果使用临时表方法,那么需要通过schma转换为DataFrame

    sch = StructType([
            StructField("k_p", MapType(StringType(),FloatType()), True)
        ])
    df = spark.createDataFrame(myrdd,sch)
    

    rdd转换为DataFrame之后,字典的value值就不再是4位小数,而是比如0.11110000312328339

    二、使用 saveAsTable()

    df直接写入hive。

    from pyspark.sql import Row
    def data2row(x):
      ...
      # 直接返回Row()格式的数据
      return Row(userid=user_id,k_p=k_p_dict)
    
    # 1. 和之前方法一样,从hive表取数据
    df = spark.sql(my_sql)
    # 2. DataFrame没有map方法,所以转换为rdd,然后对每一个列处理之后再通过toDF()转换为DataFrame
    df = df.rdd.map(lambda x: data2row(x)).toDF()
    # 3. 保存到hive表。mode可以使用append、overwrite
    # "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
    # mode("append")是在原有表的基础上追加数据
    df.write.format("hive").mode("append").saveAsTable("mytable")
    

    三、思考

    当指定分区的时候,需要分区字段也有值。就需要再原数据中加入分区字段。而使用insert overwrite xxx的方法不需要。所以saveAsTable用时更多。

  • 相关阅读:
    第11条:用zip函数同时遍历两个迭代器
    第10条:尽量用enumerate取代range
    第9条:用生成器表达式来改写数据量较大的列表推导式
    MySQL的约束
    VMware下所有的系统网卡启动不起来
    windows下的mysql闪退问题
    大型网站架构模式
    MySQL的information_schema库
    mysql复制表结构和内容
    希尔排序 堆排序 归并排序
  • 原文地址:https://www.cnblogs.com/leimu/p/15007699.html
Copyright © 2011-2022 走看看