zoukankan      html  css  js  c++  java
  • SparkSQL 创建空dataframe

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.{SparkSession}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.functions.monotonically_increasing_id
    import org.apache.spark.sql.types.DateType

    object APPUser {
    def main(args:Array[String]):Unit = {
    Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val spark = SparkSession
    .builder()
    .appName("SparkSessionAppUser")
    .master("local[2]")
    .getOrCreate()
    val linesRDD = spark
    .sparkContext.textFile("./data/user_app.txt")
    .repartition(1)
    val rowsRDD = linesRDD
    .map{row => row.split(",")}
    .map{cols =>
    Row(cols(0),cols(1),cols(2).trim.toInt,cols(3).trim.toDouble,cols(4))}

    val schema = StructType(List(
    StructField("serv_number",StringType,false),
    StructField("unf_app_code",StringType,false),
    StructField("click_cnt",IntegerType,false),
    StructField("access_time",DoubleType,false),
    StructField("statis_date",StringType,false)))

    val user_appDF = spark.createDataFrame(rowsRDD,schema)
    val user_app_indexDF = user_appDF
    .withColumn("id",monotonically_increasing_id + 1)
    user_app_indexDF.createOrReplaceTempView("user_app")
    //user_app 日期类型转换并注册视图
    val user_app_index_dateDF = spark.sql("select id,serv_number,unf_app_code,click_cnt,access_time," +
    "to_date(statis_date,'yyyyMMdd') as static_date from user_app")
    user_app_index_dateDF.createOrReplaceTempView("user_app")

    val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],schema)

    //spark.sql("select current_timestamp,date_format('2016-04-08', 'y'), date_add('2016-07-30', 1),date_sub('2016-07-30', 1),datediff('2009-07-31', '2009-07-30')").show()
    spark.stop()

    }

    }
  • 相关阅读:
    LVS负载均衡NAT模式实现
    Linux-ftp服务搭建
    Keepalived配置详解
    Keepalived高可用概念篇
    Nginx-http_proxy_module模块
    Nginx-keepalived+Nginx实现高可用集群
    Oracle注入之带外通信
    Oracle基于延时的盲注总结
    Oracle基于布尔的盲注总结
    Oracle报错注入总结
  • 原文地址:https://www.cnblogs.com/songyuejie/p/15673435.html
Copyright © 2011-2022 走看看