zoukankan      html  css  js  c++  java
  • spark dataframe 处理数据 增删改查

    1、配置文件

    package config
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    case object conf {
       private val master = "local[*]"
       val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs")
    //   val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs")
       val sc = new SparkContext(confs)
       sc.setLogLevel("ERROR")
       val spark_session: SparkSession = SparkSession.builder()
        .appName("jobs").config(confs).getOrCreate()
    
    //   设置支持笛卡尔积 对于spark2.0来说
       spark_session.conf.set("spark.sql.crossJoin.enabled",value = true)
    }
    

      

    2、处理脚本

    package sparkDataMange
    import config.conf.{sc, spark_session}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode}
    import config.conf.spark_session.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.DoubleType
    
    object irisDataFrame {
    
      def main(args: Array[String]): Unit = {
    
        val st: Long = System.currentTimeMillis()
    
        val path:String = "data/iris.data"
        var df: DataFrame = spark_session.read.csv(path)
    
        /*
        * csv 文件数据内容如下
        *
        5.1,3.5,1.4,0.2,Iris-setosa
        5.0,2.0,3.5,1.0,Iris-versicolor
        6.2,3.4,5.4,2.3,Iris-virginica
    
        * */
    
        /*
        +---+---+---+---+-----------+
        |_c0|_c1|_c2|_c3|        _c4|
        +---+---+---+---+-----------+
        */
    
        val  ls: Column = when(col("_c4").equalTo("Iris-setosa"), "1").
          when(col("_c4").equalTo("Iris-versicolor"), "2").
          otherwise("3")
    
        df = df.withColumn("_c4",ls)
    
        df = df.select(df.columns.map(f => df(f).cast(DoubleType)): _*)
        df.show()
    
        /*
        *  处理结果
        *
        5.1,3.5,1.4,0.2,1
        5.0,2.0,3.5,1.0,2
        6.2,3.4,5.4,2.3,3
    
        * */
    
        df.printSchema()
    
        spark_session.stop()
    
        println("执行时间为:"+ (System.currentTimeMillis()-st)/1000.toDouble +"s")
      }
    }
    

      

  • 相关阅读:
    dup和dup2
    cassandra nodetools
    python 之mechanize
    IDEA使用GsonFormat
    游标应用
    SQL 2005 with(nolock)详解
    SET NOCOUNT ON
    异常处理机制(Begin try Begin Catch)
    FILLFACTOR 作用 sql
    触发器语法
  • 原文地址:https://www.cnblogs.com/wuzaipei/p/13933030.html
Copyright © 2011-2022 走看看