zoukankan      html  css  js  c++  java
  • pyspark 使用udf

    官方文档:
    https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

    一、概述

    使用pyspark操作hive,可以很方便得使用udf。

    二、实例

    1. 建表并导入数据

    from os.path import abspath
    
    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    
    # warehouse_location points to the default location for managed databases and tables
    warehouse_location = abspath('spark-warehouse')
    
    spark = SparkSession 
        .builder 
        .appName("Python Spark SQL Hive integration example") 
        .config("spark.sql.warehouse.dir", warehouse_location) 
        .enableHiveSupport() 
        .getOrCreate()
    
    # spark is an existing SparkSession
    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    spark.sql("LOAD DATA LOCAL INPATH './kv1.txt' INTO TABLE src")
    

    如果是在win10环境下运行,在传入数据之后,需要修改kv1.txt的权限,使其被程序可读。

    2. 一些查询操作

    # Queries are expressed in HiveQL
    spark.sql("SELECT * FROM src").show()
    # +---+-------+
    # |key|  value|
    # +---+-------+
    # |238|val_238|
    # | 86| val_86|
    # |311|val_311|
    # ...
    
    # Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show()
    # +--------+
    # |count(1)|
    # +--------+
    # |    500 |
    # +--------+
    
    # The results of SQL queries are themselves DataFrames and support all normal functions.
    sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
    
    # The items in DataFrames are of type Row, which allows you to access each column by ordinal.
    stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
    for record in stringsDS.collect():
        print(record)
    # Key: 0, Value: val_0
    # Key: 0, Value: val_0
    # Key: 0, Value: val_0
    # ...
    
    # You can also use DataFrames to create temporary views within a SparkSession.
    Record = Row("key", "value")
    recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
    recordsDF.createOrReplaceTempView("records")
    
    # Queries can then join DataFrame data with data stored in Hive.
    spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    # +---+------+---+------+
    # |key| value|key| value|
    # +---+------+---+------+
    # |  2| val_2|  2| val_2|
    # |  4| val_4|  4| val_4|
    # |  5| val_5|  5| val_5|
    

    3. udf

    需求:返回某个字段值的平方

    1. 编写udf

    def func_two(key):
        return key*key
    

    2.注册udf

    register包含三个参数:注册后的udf的函数名,原函数名,函数的返回值类型(需要其为pyspark.sql.types里的类型)

    from pyspark.sql.types import IntegerType
    spark.udf.register("func_two",func_two,IntegerType())
    sc = spark.sparkContext
    from pyspark.sql import HiveContext
    # 新的hc
    hc = HiveContext(sc)
    

    3.使用udf

    hc.sql("SELECT func_two(key) as key,value FROM src").collect()
    
  • 相关阅读:
    完成一个Laravel项目的过程
    composer的安装以及具体使用
    mongoDB命令
    test
    豆瓣自动注册、回贴脚本 powered by Python & Selenium
    memcache和redis的对比
    高并发下缓存和数据库一致性问题(更新淘汰缓存不得不注意的细节)
    使用PHP连接、操纵Memcached的原理和教程
    php面向对象 ::、-&gt;、self、$this几种操作符的区别介绍
    nginx url 重写
  • 原文地址:https://www.cnblogs.com/leimu/p/14846438.html
Copyright © 2011-2022 走看看