zoukankan      html  css  js  c++  java
  • pyspark读取pickle文件内容并存储到hive

    在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。

    过程:

    • 使用pickle模块读取.plk文件;

    • 将读取到的内容转为RDD;

    • 将RDD转为DataFrame之后存储到Hive仓库中;

    1、使用pickle保存和读取pickle文件

    import pickle
    data = ""
    path = "xxx.plj"
    #保存为pickle
    pickle.dump(data,open(path,'wb'))
    #读取pickle
    data2 = pickle.load(open(path,'rb'))

    使用python3读取python2保存的pickle文件时,会报错:

    UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)

    解决方法:

    data2 = pickle.load(open(path,'rb',encoding='latin1'))

    使用python2读取python3保存的pickle文件时,会报错:

    unsupported pickle protocol:3

    解决方法:

    import pickle
    path = "xxx.plk"
    path2 = 'xxx2.plk'
    data = pickle.load(open(path,'rb'))
    #保存为python2的pickle
    pickle.dump(data,open(path2,'wb'),protocol=2)
    #读取pickle
    data2 = pickle.load(open(path2,'rb'))

    2、读取pickle的内容并转为RDD

    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    import pickle
    
    
    spark = SparkSession 
        .builder 
        .appName("Python Spark SQL basic example") 
        .config("spark.some.config.option", "some-value") 
        .getOrCreate()
    with open(picle_path,"rb") as fp:
        data = pickle.load(fp)
        #这里可根据data的类型进行相应的操作
    
    #假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd
    pickleRdd = spark.parallelize(data)

    3、将rdd转为dataframe并存入到Hive中

    #定义列名
    column = Row('col')
    #转为dataframe
    pickleDf =pickleRdd.map(lambda x:column(x))
    #存储到Hive中,会新建数据库:hive_database,新建表:hive_table,以覆盖的形式添加,partitionBy用于指定分区字段
    pickleDf..write.saveAsTable("hive_database.hvie_table", mode='overwrite', partitionBy=‘’)

    补充存入到Hive中的知识:

    (1)通过sql的方式

    data = [
        (1,"3","145"),
        (1,"4","146"),
        (1,"5","25"),
        (1,"6","26"),
        (2,"32","32"),
        (2,"8","134"),
        (2,"8","134"),
        (2,"9","137")
    ]
    df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])
     
    # method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
    df.registerTempTable('test_hive')
    sqlContext.sql("create table default.write_test select * from test_hive")

    或者:

    # df 转为临时表/临时视图
    df.createOrReplaceTempView("df_tmp_view")
    # spark.sql 插入hive
    spark.sql(""insert overwrite table 
                        XXXXX  # 表名
                       partition(分区名称=分区值)   # 多个分区按照逗号分开
                       select 
                       XXXXX  # 字段名称,跟hive字段顺序对应,不包含分区字段
                       from df_tmp_view""")

    (2)以saveAsTable的形式

    # "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
    #  mode("append")是在原有表的基础上进行添加数据
    df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')

    以下是通过rdd创建dataframe的几种方法:

    (1)通过键值对

    d = [{'name': 'Alice', 'age': 1}]
    output = spark.createDataFrame(d).collect()
    print(output)
    
    # [Row(age=1, name='Alice')]

    (2)通过rdd

    a = [('Alice', 1)]
    rdd = sc.parallelize(a)
    output = spark.createDataFrame(rdd).collect()
    print(output)
    output = spark.createDataFrame(rdd, ["name", "age"]).collect()
    print(output)
    
    # [Row(_1='Alice', _2=1)]
    # [Row(name='Alice', age=1)]

    (3)通过rdd和Row

    from pyspark.sql import Row
    
    
    a = [('Alice', 1)]
    rdd = sc.parallelize(a)
    Person = Row("name", "age")
    person = rdd.map(lambda r: Person(*r))
    output = spark.createDataFrame(person).collect()
    print(output)
    
    # [Row(name='Alice', age=1)]

    (4)通过rdd和StrutType

    from pyspark.sql.types import *
    
    a = [('Alice', 1)]
    rdd = sc.parallelize(a)
    schema = StructType(
        [
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True)
        ]
    )
    output = spark.createDataFrame(rdd, schema).collect()
    print(output)
    
    # [Row(name='Alice', age=1)]

    (5)基于pandas dataframe创建

    df = spark.createDataFrame(rdd, ['name', 'age'])
    print(df)  # DataFrame[name: string, age: bigint]
    
    print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'>
    
    # 传入pandas DataFrame
    output = spark.createDataFrame(df.toPandas()).collect()
    print(output)
    
    # [Row(name='Alice', age=1)]

    参考:

    https://blog.csdn.net/sinat_28224453/article/details/84977693

    https://blog.csdn.net/weixin_39198406/article/details/104916715

    https://blog.csdn.net/u011412768/article/details/93426353

  • 相关阅读:
    SpringBoot全局日志管理(AOP)
    SpringBoot集成Shiro
    MybatisPlus使用介绍
    SpringBoot集成MybatisPlus
    SpringBoot集成多数据源
    SpringBoot事务管理
    ETL 工具和 BI 工具
    oracle中行转列操作
    Oracle基本数据类型总结
    LINUX中lrzsz软件的使用
  • 原文地址:https://www.cnblogs.com/xiximayou/p/13817505.html
Copyright © 2011-2022 走看看