zoukankan      html  css  js  c++  java
  • spark操作总结

    一、sparkContext与sparkSession区别

    任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数,sparkContext只能在driver机器上面启动;
    SparkSession: SparkSession实质上是SQLContext和HiveContext的组合,SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成

    val conf: SparkConf = new SparkConf().setAppName("test")
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

     

    二、repartition与coalesce区别

    repartition一般是用来增加分区数(当然也可以减少),coalesce只能用来减少分区数。所以如果不介意保存的文件块大小不一样,可以使用coalesce来减少分区数,保存的时候一个分区就会生成一个文件块

     

    三、Scala常用方法

    1. StringBuilder

    主要用于字符串的拼接,可作用于生成倒排序列,如:
    val userItemScore = sc.parallelize(List((1001, 1, 0.8), (1001, 2, 0.7), (1001, 3, 0.5), (1001, 4, 0.9)))
    userItemScore.map(x => (x._1, (x._2.toString, x._3.toString))).groupByKey()
    .map{x =>
    val userid = x._1
    val item_score_list = x._2
    val tmp_arr = item_score_list.toArray.sortWith(_._2 > _._2)
    val watch_len = tmp_arr.length
    val strbuf = new StringBuilder()
    
    for (i <- 0 until watch_len - 1) {
    strbuf ++= tmp_arr(i)._1
    strbuf.append(":")
    strbuf ++= tmp_arr(i)._2
    strbuf.append(" ")
    }
    strbuf ++= tmp_arr(watch_len - 1)._1
    strbuf.append(":")
    strbuf ++= tmp_arr(watch_len - 1)._2
    
    userid + "	" + strbuf
    }.collect()

     

    2. scala.collection.mutable.ArrayBuffer

    相当于是一个大小可变数组,把需要的值添加进来,例如:
    val tmpArray = new ArrayBuffer[String]()
    val tmpArray = new ArrayBuffer[Int]()
    val tmpArray = new ArrayBuffer[(String, Int)]()
    scala> tmpArray.append(("wangzai", 1))
    scala> tmpArray
    res11: scala.collection.mutable.ArrayBuffer[(String, Int)] = ArrayBuffer((wangzai,1), (test,2))
    
    tmpArray.indexOf(("test",2))为获取当前值的索引,返回类型为整型
    tmpArray.slice(tmpArray.indexOf(("test", 2)), tmpArray.length)为切片,返回类型为ArrayBuffer

     

    四、通过spark-shell来操作数据库中的表

    1 启动(通过--jars指定包,后面reids包不需要,只是演示添加多个包的用法)

    /xxx/spark/bin/spark-shell 
    --master spark://xxx:7077 
    --executor-cores 1 
    --total-executor-cores 5 
    --driver-memory 2g 
    --jars /xxx/jars/mysql-connector-java-5.1.38.jar,/xxx/jars/jedis-2.9.0.jar

    2 在命令行中输入::paste, 然后粘贴以下代码,最后ctrl+D退出之后,即可执行

    
    

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.SparkConf
    val conf: SparkConf = new SparkConf()
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val mysqlUrl: String = "jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
    val productTable: String = "product_info"
    val orderTable: String = "order_info"
    val properties: Properties = new Properties()
    properties.put("user", user)
    properties.put("password", password)

    
    

    // 获取同事购配置表数据
    val productDF: DataFrame = spark.read.jdbc(mysqlUrl, productTable, properties).select("id", "name")
    val orderDF: DataFrame = spark.read.jdbc(mysqlUrl, orderTable, properties).select("product_id", "createTime")

    
    

    val totalDataDF = productDF.join(orderDF, orderDF("product_id") === productDF("id")).drop("id")
    //如果product_info对应的id为product_id,即关联id字段名不相同
    //val totalDataDF = productDF.join(orderDF, Seq("product_id"))

    3 把该DateFrame注册为临时表才能通过spark-sql操作

    totalDataDF.createOrReplaceTempView("totalDataDF")

     

    五、spark-sql的基本操作

    //默认显示20条数据
    scala> df.show()
    //打印模式信息
    scala> df.printSchema()
    //选择多列
    scala> df.select(df("name"),df("age")+1).show()
    // 条件过滤
    scala> df.filter(df("age") > 20 ).show()
    // 分组聚合
    scala> df.groupBy("age").count().show()
    // 排序
    scala> df.sort(df("age").desc).show()
    //多列排序
    scala> df.sort(df("age").desc, df("name").asc).show()
    //对列进行重命名
    scala> df.select(df("name").as("username"),df("age")).show()
    //对多个列重命名
    scala> df.withColumnRenamed("id", "userId").withColumnRenamed("name", "userName")

     

     

  • 相关阅读:
    KnockoutJS 3.X API 第五章 高级应用(4) 自定义处理逻辑
    KnockoutJS 3.X API 第五章 高级应用(3) 虚拟元素绑定
    KnockoutJS 3.X API 第五章 高级应用(2) 控制后代绑定
    KnockoutJS 3.X API 第五章 高级应用(1) 创建自定义绑定
    KnockoutJS 3.X API 第四章(14) 绑定语法细节
    KnockoutJS 3.X API 第四章(13) template绑定
    KnockoutJS 3.X API 第四章 表单绑定(12) selectedOptions、uniqueName绑定
    KnockoutJS 3.X API 第四章 表单绑定(11) options绑定
    KnockoutJS 3.X API 第四章 表单绑定(10) textInput、hasFocus、checked绑定
    KnockoutJS 3.X API 第四章 表单绑定(9) value绑定
  • 原文地址:https://www.cnblogs.com/654wangzai321/p/11096992.html
Copyright © 2011-2022 走看看