zoukankan      html  css  js  c++  java
  • spark_learn

    package chapter03
    
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by chenzechao on 2017/12/21.
      */
    
    /**
      spark-shell                                      
      --master yarn-client                             
      --driver-memory 1G                               
      --driver-cores 1                                 
      --queue root.queue_0101_04                       
      --executor-memory 2G                            
      --num-executors 2                                
      --conf spark.executor.cores=1                    
      --name 'tmp_abc_test'                           
      --conf spark.yarn.executor.memoryOverhead=4096   
      --conf spark.driver.maxResultSize=8G             
      --conf spark.sql.hive.metastore.version=1.2.1    
      --conf spark.sql.shuffle.partitions=150
      */
    
    
    object document {
      // 0 获取参数flag
    
      //0.设置环境
      val conf = new SparkConf().setAppName("tianchi").setMaster("local[*]")
      val sc = new SparkContext(conf)
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      val hiveContext = new HiveContext(sc)
    
      val jsonFile = "file:///tmp/upload/data/json_file"
      val jsonFile_hdfs = "/tmp/ccc/tmpc/json_file"
      // 执行SQL
      val df1 = sqlContext.sql("select * from sx_360_safe.sub_ladm_exc_app_s16_all_for_double").limit(200).cache()
      df1.count()
    
      // Print the schema in a tree format
      df1.printSchema()
    
      // Select only then "gu_flag" column
      df1.select("gu_flag").show()
    
      // Select everybody, but increment the age by 1
      df1.select(df1("empno"),df1("age"),df1("age") + 1 ).show
    
      // Select emp age older than 21
      df1.filter(df1("age") > 21).select(df1("empno"),df1("age")).show()
    
      // Count emp by age
      df1.groupBy(df1("age")).count().sort(df1("age")).show()
      val gb = df1.groupBy(df1("age")).count()
      gb.sort(gb("count")).show()
    
    
    
      // save dataFrame as json file
      df1.write.mode("Overwrite").format("json").save(jsonFile_hdfs)
      df1.write.mode("Append").format("json").save(jsonFile_hdfs)
      df1.select(df1("empno"), df1("gu_flag")).write.mode("Overwrite").format("parquet").saveAsTable("sx_360_safe.tmp_czc_20180323_04")
    
    
    
      // this is used to implicitly convert an RDD to a DataFrame.
      import sqlContext.implicits._
    
      val df2 = sqlContext.read.json(jsonFile)
    
    
      // Encoders for most common types are automatically provided by importing sqlContext.implicits._
      val ds1 = Seq(1, 2, 3).toDS()
      ds1.map(_ + 1).collect()
    
      // Encoders are also created for case class
      case class Person(name:String ,age: Long)
      val ds = Seq(Person("Andy",35)).toDS()
      ds.show()
    
    
      /**
        * Inferring the Schema Using Reflection
        */
      import sqlContext.implicits._
      case class Person2(name:String, age:Int)
      val people = sc.textFile("/tmp/ccc/data/tmpa").filter(_.length > 1).map(_.split(",")).map(p => Person2(p(0),p(1).trim.toInt)).toDF()
      people.registerTempTable("people")
      sqlContext.sql("select * from people limit 10").show
    
      val teenagers = sqlContext.sql("select name,age from people where age >= 23 and age<= 26")
      teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    
      // or by field name
      teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
    
    // row.getValuesMap[T] retrieves multiple columns at once into a Map[String,T]
      teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)
    
      /**
        * Programmatically Specifying the Schema
        */
      val schemaString  = "name age"
      import org.apache.spark.sql.Row
      import org.apache.spark.sql.types.{StructType,StructField,StringType}
    
      val schema =
        StructType(
          schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true))
      )
    
      // Convert records of the RDD (people) to Rows
      val people2 = sc.textFile("/tmp/ccc/data/tmpa")
      val rowRDD = people2.map(_.split(",")).map(p => Row(p(0),p(1).trim))
    
      // Apply the schema to the RDD
      val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)
    
      // Register the DataFrames as a table
      peopleDataFrame.registerTempTable("people")
    
      // SQL
    
    
    
    
      val df = sqlContext.read.load("/tmp/examples/src/main/resources/users.parquet")
    
      val df3 = sqlContext.read.format("json").load("/tmp/examples/src/main/resources/people.json")
    
      // Run SQL on files directly
      val df4 = sqlContext.sql("select * from parquet.`/tmp/examples/src/main/resources/users.parquet`")
    
      // Save modes
      /**
        * ErrorIfExists (default)
        * Append
        * Overwrite
        * Ignore
        */
    
      val parquetFile = sqlContext.read.parquet("")
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    }
  • 相关阅读:
    phpcms 任意位置获取用户头像
    php微信公众帐号发送红包
    nginx解决502错误
    phpcms v9 万能字段使用
    0转换为万
    温故而知新(三)
    温故而知新(一)
    基础积累,来自其他网站的链接
    GCD多线程 在子线程中获取网络图片 在主线程更新
    iOS9 中的一些适配问题
  • 原文地址:https://www.cnblogs.com/chenzechao/p/8631935.html
Copyright © 2011-2022 走看看